/* * Copyright 1995, 1996 Perforce Software. All rights reserved. * * This file is part of Perforce - the FAST SCM System. */ /* * Rpc.cc - remote procedure service * * The general flow of the client/server protocol: * * The client sends a message containing the user's request to the * server and then dispatches, i.e. reads and executes requests from * the server until the server sends the "release" message. * * The server initially dispatches, i.e. reads and executes a * request from the user. The final act of the request should * either be to release the client (send the "release" message), * or send a message to the client instructing it to invoke * (eventually) another server request. In the end, the final * message sent to the client should be "release". */ # include <stdhdrs.h> # include <signaler.h> # include <debug.h> # include <tunable.h> # include <strbuf.h> # include <strdict.h> # include <strarray.h> # include <strtable.h> # include <error.h> # include <errorlog.h> # include <tracker.h> # include <timer.h> # include <md5.h> # include <ticket.h> # include <keepalive.h> # include <netportparser.h> # include <netconnect.h> # include <netbuffer.h> # include <rpcdebug.h> # include <msgrpc.h> # include <p4tags.h> # ifdef USE_SSL extern "C" { // OpenSSL # include "openssl/bio.h" # include "openssl/ssl.h" # include "openssl/err.h" } # endif //USE_SSL # include "enviro.h" # include "filesys.h" # include "pathsys.h" # include "rpc.h" # include "rpcbuffer.h" # include "rpctrans.h" # include "rpcdispatch.h" # include "rpcdebug.h" # include "rpcservice.h" # include "netsslcredentials.h" const char *RpcTypeNames[] = { "", // RPC_CLIENT = 0, "rmt: ", // RPC_REMOTE, "px: ", // RPC_PX, "", // RPC_RH, "bgd: ", // RPC_BACKGROUND, "pxc: ", // RPC_PXCLIENT, "rpl: ", // RPC_RPL, "bs: ", // RPC_BROKER, "bc: ", // RPC_BROKER_TO_SERVER, "rauth: ", // RPC_RMTAUTH, "sbx: ", // RPC_SANDBOX, "x3: ", // RPC_X3, "ukn: ", // RPC_UNKNOWN "dmrpc: ", // RPC_DMRPC 0 }; RpcService::RpcService() { // Initialize our members. dispatcher = new RpcDispatcher; protoSendBuffer = new RpcSendBuffer; endPoint = 0; openFlag = RPC_CONNECT; // Load up core dispatch routines Dispatcher( rpcServices ); } RpcService::~RpcService() { delete dispatcher; delete endPoint; delete protoSendBuffer; } /** * RpcService::SetEndpoint * * @brief Creates an communication's endpoint for either listening or connecting. * * @param address for socket * @param error structure pointer * @see NetEndPoint::Create */ void RpcService::SetEndpoint( const char *addr, Error *e ) { delete endPoint; endPoint = NetEndPoint::Create( addr, e ); } /* * RpcService::Dispatcher() - add another list of function to call on received RPC */ void RpcService::Dispatcher( const RpcDispatch *dispatch ) { dispatcher->Add( dispatch ); } void RpcService::Listen( Error *e ) { // initialize listen openFlag = RPC_LISTEN; endPoint->Listen( e ); if( e->Test() ) { // Failed open -- clear open flag so connect bails, too. e->Set( MsgRpc::Listen ) << endPoint->GetAddress(); openFlag = RPC_NOOPEN; return; } } void RpcService::GetHost( StrPtr *peerAddress, StrBuf &hostBuf, Error *e ) { hostBuf.Clear(); NetEndPoint *ep = NetEndPoint::Create( peerAddress->Text(), e ); if( !e->Test() ) hostBuf.Set( ep->GetPrintableHost() ); delete ep; return; } StrPtr * RpcService::GetListenAddress( int raf_flags ) { return endPoint ? endPoint->GetListenAddress( raf_flags ) : 0; } void RpcService::ListenCheck( Error *e ) { endPoint->ListenCheck( e ); } int RpcService::CheaterCheck( const char *port ) { return endPoint->CheaterCheck( port ); } void RpcService::Unlisten() { if( endPoint ) endPoint->Unlisten(); } void RpcService::GetExpiration( StrBuf &buf ) { if( endPoint ) endPoint->GetExpiration( buf ); } void RpcService::SetProtocol( const char *var, const StrRef &value ) { protoSendBuffer->SetVar( StrRef( (char*)var ), value ); } void RpcService::SetProtocolV( const char *arg ) { StrBuf s; const char *p; if( p = strchr( arg, '=' ) ) { s.Set( arg, p - arg ); protoSendBuffer->SetVar( s, StrRef( (char *)p + 1 ) ); } else { protoSendBuffer->SetVar( StrRef( (char *)arg ), StrRef::Null() ); } } const StrBuf RpcService::GetMyQualifiedP4Port( StrBuf &serverSpecAddr, Error &e ) const { StrBuf result; if( endPoint ) { result = endPoint->GetPortParser().GetQualifiedP4Port( serverSpecAddr, e); } else { e.Set( MsgRpc::BadP4Port ) << "no endpoint"; } return result; } void RpcService::GetMyFingerprint( StrBuf &value ) { endPoint->GetMyFingerprint( value ); } int RpcService::IsSingle() { return endPoint ? endPoint->IsSingle() : 0; } static void RpcCleanup( Rpc *r ) { r->FlushTransport(); } /* * Rpc */ Rpc::Rpc( RpcService *s ) { recvBuffering = 0; service = s; transport = 0; forward = 0; sendBuffer = new RpcSendBuffer; recvBuffer = new RpcRecvBuffer; protoDynamic = new StrBufDict; duplexFsend = 0; duplexFrecv = 0; duplexRsend = 0; duplexRrecv = 0; dispatchDepth = 0; endDispatch = 0; suspendDispatch = 0; priorityDispatch = 0; protocolSent = 0; protocolServer = 0; rpc_hi_mark_rev = rpc_hi_mark_fwd = p4tunable.Get( P4TUNE_RPC_HIMARK ); rpc_lo_mark = p4tunable.Get( P4TUNE_RPC_LOWMARK ); TrackStart(); timer = new Timer; keep = 0; } Rpc::~Rpc( void ) { signaler.DeleteOnIntr( this ); Disconnect(); delete sendBuffer; delete recvBuffer; delete protoDynamic; delete timer; } void Rpc::DoHandshake( Error *e ) { if( transport ) transport->DoHandshake(e); } void Rpc::ClientMismatch( Error *e ) { if( transport ) transport->ClientMismatch( e ); } void Rpc::SetProtocolDynamic( const char *var, const StrRef &value ) { protoDynamic->ReplaceVar( var, value.Text() ); } void Rpc::ClearProtocolDynamic( const char *var ) { protoDynamic->RemoveVar( var ); } void Rpc::Connect( Error *e ) { // Don't allow double connects. if( transport ) { e->Set( MsgRpc::Reconn ); return; } // If an operation fails, cruft can be left in the sendBuffer. // Since we may use this buffer over for the next connection, we // need to clear it now. sendBuffer->Clear(); duplexFsend = 0; duplexFrecv = 0; duplexRsend = 0; duplexRrecv = 0; dispatchDepth = 0; endDispatch = 0; protocolSent = 0; re.Clear(); se.Clear(); // Create the transport layer, // Cleanup code is in Disconnect(). NetTransport *t = 0; switch( service->openFlag ) { case RPC_LISTEN: t = service->endPoint->Accept( keep, e ); break; case RPC_CONNECT: t = service->endPoint->Connect( e ); break; default: e->Set( MsgRpc::Unconn ); } if( e->Test() ) { if( t ) delete t; // connect errors leave send & receive errors // so as to intercept Invoke() and Dispatch() re = *e; se = *e; return; } // Pass transport onto the buffering routines. transport = new RpcTransport( t ); if( keep ) transport->SetBreak( keep ); // If rpc.himark tuned beyond net.bufsize, must tell NetBuffer. transport->SetBufferSizes( rpc_hi_mark_fwd, rpc_hi_mark_rev ); if( service->openFlag == RPC_CONNECT ) signaler.OnIntr( (SignalFunc)RpcCleanup, this ); } void Rpc::GetEncryptionType( StrBuf &value ) { if( transport ) transport->GetEncryptionType( value ); } void Rpc::GetPeerFingerprint( StrBuf &value ) { if( transport ) transport->GetPeerFingerprint( value ); } void Rpc::GetExpiration(StrBuf &value) { if( service ) service->GetExpiration( value ); } /* * Rpc::SetHiMark() - move rpc_hi_mark beyond 2000 default * * If we get reliable send/receive TCP buffer sizes from the client * (they often differ from each other and from the server's), we * can feel confident about moving our himark off the default 2000. * * The thinking as of Feb 2009 is that one direction of a TCP * connection can have data outstanding that amounts to the receiver's * SO_RCVBUF. Many systems can take more -- we tried the sum of * the sender's SO_SNDBUF and the receiver's SO_RCVBUF -- but some * systems can't. Solaris, for example, starts to close its window * when it receives more than SO_RCVBUF, keeps accepting data until * the window is 0, and won't reopen it until the application reads * all outstanding data. Weird and unfortunate, because we generally * read in small chunks. * * Sometimes we're metering client->server data, because the * server->client side is clogged with file data (sync uses * InvokeDuplex()). Other times we're metering server->client data, * because the client->server side is clogged (resolve, submit use * InvokeDuplexRev()). Since the server and client may have different * SO_RCVBUF sizes, and we want to make the most of the connection, * we keep separate rpc_hi_mark_fwd and rpc_hi_mark_rev limits. We * miss out when we meter data but neither side is clogged (diff, * open, revert). Our high mark could be somewhere around the sum * of the SO_RCVBUFs for the client and server in this case, since * the metered data could wait in either buffer. But we're not * yet that sophisticated. * * Note that TCP limits its window size to the minimum of the * sender's SO_SNDBUF and receiver's SO_RCVBUF, and that limits the * amount of data travelling on the network in that direction. If * our himark were lower than the window size, we would artificially * limit that amount of data further. Thus we'd like the himark * high. Fortunately, we seem safe in making it SO_RCVBUF. * * A SO_SNDBUF larger than the SO_RCVBUF (and thus the TCP window) * should have no impact. A SO_RCVBUF larger than the SO_SNDBUF * (and thus a himark larger than the TCP window) is beneficial in * the case where we're metering data on one side but the other * side is not clogged. It allows more data to be in transit on * the other side of the connection. */ void Rpc::SetHiMark( int sndbuf, int rcvbuf ) { // For testing and emergencies setting rpc.himark overrides all. if( p4tunable.IsSet( P4TUNE_RPC_HIMARK ) ) return; const int minHiMark = p4tunable.Get( P4TUNE_RPC_HIMARK ); rpc_hi_mark_fwd = transport->GetRecvBuffering(); rpc_hi_mark_rev = rcvbuf; // Since Invoke() needs to Dispatch() _before_ we hit // the himark, we reduce the himark by lomark, and hope that // no single duplex message is larger than lomark. rpc_hi_mark_fwd -= rpc_lo_mark; rpc_hi_mark_rev -= rpc_lo_mark; if( rpc_hi_mark_fwd < minHiMark ) rpc_hi_mark_fwd = minHiMark; if( rpc_hi_mark_rev < minHiMark ) rpc_hi_mark_rev = minHiMark; // Put a stake through the heart of himark hanging: set our // NetBuffer buffer sizes to handling any outstanding data, // in case the lying cheating OS (linux, solaris) renegs // on its reported TCP send or receive space. transport->SetBufferSizes( rpc_hi_mark_fwd, rpc_hi_mark_rev ); RPC_DBG_PRINTF( DEBUG_CONNECT, "Rpc himark: snd+rcv server %d+%d client %d+%d = %d/%d", transport->GetSendBuffering(), transport->GetRecvBuffering(), sndbuf, rcvbuf, rpc_hi_mark_fwd, rpc_hi_mark_rev ); } void Rpc::Disconnect() { if( !transport ) return; transport->Flush( &se ); transport->Close(); delete transport; transport = 0; } StrPtr * Rpc::GetAddress( int raf_flags ) { return transport ? transport->GetAddress( raf_flags ) : 0; } bool Rpc::HasAddress() { return transport ? transport->HasAddress() : false; } StrPtr * Rpc::GetPeerAddress( int raf_flags ) { return transport ? transport->GetPeerAddress( raf_flags ) : 0; } int Rpc::GetPortNum() { return transport ? transport->GetPortNum() : -1; } bool Rpc::IsSockIPv6() { return transport ? transport->IsSockIPv6() : false; } StrBuf * Rpc::MakeVar( const char *var ) { return sendBuffer->MakeVar( StrRef( (char *)var ) ); } void Rpc::VSetVar( const StrPtr &var, const StrPtr &value ) { sendBuffer->SetVar( var, value ); } void Rpc::Release() { this->Invoke( P4Tag::p_release ); } void Rpc::ReleaseFinal() { this->Invoke( P4Tag::p_release2 ); } StrPtr * Rpc::VGetVar( const StrPtr &var ) { return recvBuffer->GetVar( var ); } int Rpc::VGetVarX( int x, StrRef &var, StrRef &val ) { return recvBuffer->GetVar( x, var, val ); } void Rpc::VRemoveVar( const StrPtr &var ) { recvBuffer->RemoveVar( var ); } void Rpc::VClear() { sendBuffer->Clear(); } StrPtr * Rpc::GetArgi( int i ) { return i < recvBuffer->GetArgc() ? &recvBuffer->GetArgv()[i] : 0; } StrPtr * Rpc::GetArgi( int i, Error *e ) { StrPtr *s = GetArgi( i ); if( !s ) e->Set( MsgRpc::NoPoss ); return s; } int Rpc::GetArgc() { return recvBuffer->GetArgc(); } StrPtr * Rpc::GetArgv() { return recvBuffer->GetArgv(); } void Rpc::CopyVars() { sendBuffer->CopyVars( recvBuffer ); } KeepAlive * Rpc::GetKeepAlive() { return transport; } void Rpc::SetBreak( KeepAlive *breakCallback ) { keep = breakCallback; if( transport ) transport->SetBreak( breakCallback ); } /* * Invoke() * InvokeDuplex() * InvokeDuplexRev() * FlushDuplex() * InvokeFlush() * InvokeOne() * GotFlushed() */ void Rpc::Invoke( const char *opName ) { // If there are any outstanding requests sent via InvokeDuplexRev, // we have to assume the return pipe is clogged, and thus meter all // data sent. if( duplexRrecv ) InvokeDuplex( opName ); else InvokeOne( opName ); } void Rpc::InvokeDuplex( const char *opName ) { // This data makes a loop: meter it, so we know to dispatch for it. int sz = InvokeOne( opName ); duplexFrecv += sz; duplexFsend += sz; Dispatch( DfDuplex, service->dispatcher ); } void Rpc::InvokeDuplexPlus( const char *opName, int extra ) { // This data makes a loop: meter it, so we know to dispatch for it. // We know the return will have some extra content, supply that // so we can add it in to better estimate the return int sz = InvokeOne( opName ) + extra; duplexFrecv += sz; duplexFsend += sz; Dispatch( DfDuplex, service->dispatcher ); } void Rpc::InvokeDuplexRev( const char *opName ) { // This data only goes forward, but causes the return pipe to fill. // Meter it, but also turn on duplexRrecv so that all Invokes are // metered until this message is flushed.. ++duplexRrecv; ++duplexRsend; InvokeDuplex( opName ); } void Rpc::FlushDuplex() { // Flush any outstanding duplex data in the pipe. // Do nothing if there is no duplex data expected back if( duplexFrecv > 0 ) { // Proxies and brokers expect a flush1 message // (with himark = 0) when flushing the channel, // so we have to tell Dispatch() to send one. // DfFlush moves the lomark to 0, and ensuring // duplexFsend > 0 forces a flush1 message. ++duplexFrecv; ++duplexFsend; Dispatch( DfFlush, service->dispatcher ); } } void Rpc::InvokeOver( const char *opName ) { // This data makes a loop, but we specifically don't flush yet: // we return so that the outer Dispatch() can take over. // Otherwise, we'd needlessly nest Dispatch() calls, and in // one case ('submit') we'd try to nest too deeply. int sz = InvokeOne( opName ); duplexFrecv += sz; duplexFsend += sz; Dispatch( DfOver, service->dispatcher ); } int Rpc::InvokeOne( const char *opName ) { // Don't pile errors if( se.Test() || re.Test() || !transport ) { sendBuffer->Clear(); return 0; } // Send off protocol if not yet sent. // If we're using RpcForward we want to allow it // to send its own protocol message. if( !protocolSent && strcmp( opName, P4Tag::p_protocol ) ) { RpcSendBuffer buf; int sz = transport->GetSendBuffering(); int rz = transport->GetRecvBuffering(); buf.CopyBuffer( service->protoSendBuffer ); int i = 0; StrRef var, val; while( protoDynamic->GetVar( i++, var, val ) ) buf.SetVar( var, val ); buf.SetVar( StrRef( P4Tag::v_sndbuf ), StrNum( sz ) ); buf.SetVar( StrRef( P4Tag::v_rcvbuf ), StrNum( rz ) ); if( p4tunable.Get( P4TUNE_NET_AUTOTUNE ) ) buf.SetVar( P4Tag::v_autoTune, StrRef( "1" ) ); buf.SetVar( StrRef( P4Tag::v_func ), StrRef( P4Tag::p_protocol ) ); RPC_DBG_PRINT( DEBUG_FUNCTION, "Rpc invoking protocol" ); timer->Start(); transport->Send( buf.GetBuffer(), &re, &se ); sendTime += timer->Time(); } protocolSent = 1; // Set func=opName variable. SetVar( P4Tag::v_func, opName ); // Tracking RPC_DBG_PRINTF( DEBUG_FUNCTION, "Rpc invoking %s", opName ); // Send the buffer to peer timer->Start(); transport->Send( sendBuffer->GetBuffer(), &re, &se ); if( se.Test() && se.CheckId( MsgRpc::TooBig ) ) { AssertLog.Report( &se ); sendBuffer->Clear(); StrBufDict errorDict; se.Marshall1( errorDict ); se.Clear(); int j = 0; StrRef var, val; while( errorDict.GetVar( j++, var, val ) ) sendBuffer->SetVar( var, val ); sendBuffer->SetVar( P4Tag::v_func, StrRef( "client-Message" ) ); transport->Send( sendBuffer->GetBuffer(), &re, &se ); } // time tracking sendTime += timer->Time(); if( se.Test() ) return 0; // Clear for next call. // Get size of buffer so we know how full the pipe is. // We must include RpcTransport's overhead. int sz = sendBuffer->GetBufferSize() + transport->SendOverhead(); sendBuffer->Clear(); // tracking sendCount++; sendBytes += sz; return sz; } void Rpc::GotFlushed() { StrPtr *fseq = GetVar( P4Tag::v_fseq ); StrPtr *rseq = GetVar( P4Tag::v_rseq ); if( fseq ) duplexFrecv -= fseq->Atoi(); if( rseq ) duplexRrecv -= rseq->Atoi(); } /* * Rpc::Dispatch() - dispatch incoming RPC's until 'release' received * Rpc::DispatchOne() - just dispatch from the current buffer */ void Rpc::Dispatch( DispatchFlag flag, RpcDispatcher *dispatcher ) { // Don't nest more than once: we only allow a // Dispatch()/InvokeDuplex()/FlushDuplex() combo. if( dispatchDepth > 1 ) return; // If we're containing this dispatch, don't increment the depth. // We only do this when we know that the original dispatcher is // effectivly paused, waiting for this to complete. if( flag != DfContain ) ++dispatchDepth; RPC_DBG_PRINTF( DEBUG_FLOW, ">>> Dispatch(%d%s) %d/%d %d/%d %d", dispatchDepth, flag == DfContain ? "+" : "", duplexFsend, duplexFrecv, duplexRsend, duplexRrecv, flag ); // Use server's recv buffer size as himark for InvokeDuplex() // Use client's recv buffer size as himark for InvokeDuplexRev() int loMark = rpc_lo_mark; int hiMark = duplexRrecv ? rpc_hi_mark_rev : rpc_hi_mark_fwd; // Flushing means eveything goes. // Complete: lo = N/A, hi = N/A // Duplex: lo = LO, hi = HI // Flush: lo = 0, hi = 0 // Over: lo = 0, hi = HI if( flag != DfDuplex ) loMark = 0; if( flag == DfFlush ) hiMark = 0; // Push the recvBuffer, in case we are nesting Dispatch(). RpcRecvBuffer *savRecvBuffer = recvBuffer; recvBuffer = 0; // Receive (dispatching) until told to stop. while( !endDispatch ) { if( re.Test() && ( !transport || !transport->RecvReady() ) ) break; // If more than loMark in pipe, send a marker. // We flush always if Dispatch() called from FlushDuplex() // or if Invoking from such a Dispatch(). if( duplexFsend > loMark && !se.Test() ) { // Send a flush1 through for metering flow control. // Note: hardcode the size of the flush1 message // As of 2008.1 its 46 bytes (round up to 60). const int flushMessageSize = 60; RPC_DBG_PRINTF( DEBUG_FLOW, "Rpc flush %d bytes", duplexFsend ); SetVar( P4Tag::v_himark, loMark ? hiMark : 0 ); duplexFrecv += flushMessageSize; duplexFsend += flushMessageSize; if( duplexFsend ) SetVar( P4Tag::v_fseq, duplexFsend ); if( duplexRsend ) SetVar( P4Tag::v_rseq, duplexRsend ); duplexFsend = 0; duplexRsend = 0; InvokeOne( P4Tag::p_flush1 ); } // If top level Dispatch(), go forever. // If 2nd level Dispatch(), get below hiMark. // If flushing, go until all flush2's received. // If error sending, go until receive error. else if( flag == DfComplete || flag == DfDuplex && DuplexDispatchReady( hiMark ) || flag == DfFlush && duplexFrecv || flag == DfContain && !le.Test() || se.Test() ) { if( !recvBuffer ) recvBuffer = new RpcRecvBuffer; DispatchOne( dispatcher, flag == DfContain ); } else break; } // Pop recvBuffer. delete recvBuffer; recvBuffer = savRecvBuffer; RPC_DBG_PRINTF( DEBUG_FLOW, "<<< Dispatch(%d%s) %d/%d %d/%d %d", dispatchDepth, flag == DfContain ? "+" : "", duplexFsend, duplexFrecv, duplexRsend, duplexRrecv, flag ); if( flag == DfContain || !--dispatchDepth ) endDispatch = 0; } void Rpc::DispatchOne( RpcDispatcher *dispatcher, bool passError ) { StrPtr *func; // Flush any data due the other end: we can't expect // anything if the other end is awaiting us! // Note that a broken send pipe doesn't stop dispatching: // we want what's in the receive pipe (they may be important // acks), so we read until the receive pipe is broken too. // Receive sender's buffer and then parse the variables out. timer->Start(); int sz = transport->Receive( recvBuffer->GetBuffer(), &re, &se ); // time tracking recvTime += timer->Time(); if( sz <= 0 ) { // EOF doesn't set re, so we will. if( !re.Test() ) re.Set( MsgRpc::Closed ); return; } // tracking recvCount++; recvBytes += recvBuffer->GetBufferSize(); Error e; recvBuffer->Parse( &e ); if( e.Test() ) { re = e; return; } // Find the function to dispatch. The protocol mandates that // this must be set: how else do we know what to do with the // buffer? func = GetVar( P4Tag::v_func, &e ); if( e.Test() ) { re = e; return; } RPC_DBG_PRINTF( DEBUG_FUNCTION, "Rpc dispatch %s", func->Text() ); // Find the registered function as given with 'func'. // If no such function is found, call 'funcHandler'. // If any error occurs, call 'errorHandler'. // If no 'errorHandler' just report the error and return. ue.Clear(); const RpcDispatch *disp = dispatcher->Find( func->Text() ); if( !disp && !( disp = dispatcher->Find( P4Tag::p_funcHandler ) ) ) { ue.Set( MsgRpc::UnReg ) << *func; goto error; } // Invoke requested function. (*disp->function)( this, &ue ); // Take a copy of the errors le = ue; // If an error occurred, we'll call the caller's errorHandle // function. If the error was fatal, we'll tack on some tracing // information. if( !ue.Test() ) return; if( ue.IsFatal() ) ue.Set( MsgRpc::Operat ) << disp->opName; error: // If a user error occurred, invoke errorHandler to deal with it. // In this case, the Error is acutally passed in to the dispatched // function, rather than being just an output parameter. // The exception is if we are running as a nested Dispatch that will // pass this error to its parent (then this will be done by the parent) if( passError ) return; if( disp = dispatcher->Find( P4Tag::p_errorHandler ) ) { (*disp->function)( this, &ue ); return; } AssertLog.Report( &ue ); } int Rpc::DuplexDispatchReady( int hiMark ) { return ( duplexFrecv > hiMark && ( priorityDispatch || transport->DuplexReady() ) ); } void Rpc::Loopback( Error *e ) { recvBuffer->CopyBuffer( sendBuffer->GetBuffer() ); recvBuffer->Parse( e ); sendBuffer->Clear(); } int Rpc::Active() { return !endDispatch && ( !ReadErrors() || ( transport && transport->RecvReady() ) ); } int Rpc::DispatchReady() { return !endDispatch && transport && transport->DuplexReady(); } int Rpc::SuspendDispatch( int v ) { int r = suspendDispatch; suspendDispatch = v; return r; } int Rpc::PriorityDispatch( int v ) { int r = priorityDispatch; priorityDispatch = v; return r; } /* * Rpc::StartCompression() -- initiate full link compression * Rpc::GotRecvCompress() -- turn on recv half compression * Rpc::GotSendCompress() -- turn on send half compression * * Notes on compression. * * Both ends must support the compression protocol. Use "client" >= 6 * or "server2" >= "6" ? protocol level to be sure. * * Compression can be turned on any time; in practice, it happens * after protocol exchange has happened (otherwise, how do you know?). * To orchestrate this, the follow sequence is obeyed: * * 1. StartCompression() send "compress1" to the other end, and then turns * on send compression. All data sent afterwards are compressed. * * 2. When the other end receives "compress1", it sends back "compress2" * and then turns on both send and receieve compression. * * 3. When this end receives "compress2", it turns on receive compression. */ void Rpc::StartCompression( Error *e ) { // send the "compress1" flag, then compress the send link. // When we get "compress2", we'll compress the recv link. Invoke( P4Tag::p_compress1 ); transport->SendCompression( e ); } void Rpc::GotSendCompressed( Error *e ) { transport->SendCompression( e ); } void Rpc::GotRecvCompressed( Error *e ) { transport->RecvCompression( e ); } void Rpc::FlushTransport() { if( transport ) transport->Flush( &se ); } int Rpc::GetRecvBuffering() { if( transport ) return transport->GetRecvBuffering(); return 0; } /* * Performance tracking * * Rpc::TrackStart() - reset tracking counters * Rpc::Trackable() - is any track data interesting? * Rpc::TrackReport() - report interesting track data */ void Rpc::TrackStart() { sendCount = 0; sendBytes = 0; recvCount = 0; recvBytes = 0; sendTime = 0; recvTime = 0; } int Rpc::Trackable( int level ) { Tracker t( level ); return t.Over( TT_RPC_ERRORS, se.Test() || re.Test() ) || t.Over( TT_RPC_MSGS, sendCount + recvCount ) || t.Over( TT_RPC_MBYTES, ( sendBytes + recvBytes ) / 1024 / 1024 ); } void Rpc::TrackReport( int level, StrBuf &out ) { if( !Trackable( level ) ) return; out << "--- rpc msgs/size in+out " << StrNum( recvCount ) << "+" << StrNum( sendCount ) << "/" << recvBytes / 1024 / 1024 << "mb+" << sendBytes / 1024 / 1024 << "mb " << "himarks " << rpc_hi_mark_fwd << "/" << rpc_hi_mark_rev << " snd/rcv " << StrMs( sendTime ) << "s/" << StrMs( recvTime ) << "s\n"; if( !se.Test() && !re.Test() ) return; out << "--- rpc "; if( se.Test() ) out << "send "; if( re.Test() ) out << "receive "; out << "errors, duplexing F/R " << duplexFrecv << "/" << duplexRrecv << "\n"; } void Rpc::GetTrack( int level, RpcTrack *track ) { track->trackable = Trackable( level ); if( !track->trackable ) return; ForceGetTrack( track ); } void Rpc::ForceGetTrack( RpcTrack *track ) { if( !track ) return; track->recvCount = recvCount; track->sendCount = sendCount; track->recvBytes = recvBytes; track->sendBytes = sendBytes; track->rpc_hi_mark_fwd = rpc_hi_mark_fwd; track->rpc_hi_mark_rev = rpc_hi_mark_rev; track->recvTime = recvTime; track->sendTime = sendTime; } void Rpc::CheckKnownHost( Error *e, const StrRef & trustfile ) { StrBuf pubkey; StrPtr *peer; GetPeerFingerprint( pubkey ); // if not ssl we are done if( !pubkey.Length() ) return; peer = GetPeerAddress( RAF_PORT ); RPC_DBG_PRINTF( DEBUG_CONNECT, "Checking host %s pubkey %s", peer->Text(), pubkey.Text() ); StrRef dummyuser( "**++**" ); StrRef altuser( "++++++" ); StrBuf trustkey; int doreplace = 0; char *keystr; { Ticket hostfile( &trustfile ); keystr = hostfile.GetTicket( *peer, dummyuser ); if( keystr ) { if( pubkey == keystr ) return; trustkey.Set( keystr ); } } { Ticket hostfile( &trustfile ); keystr = hostfile.GetTicket( *peer, altuser ); if( keystr && pubkey == keystr ) doreplace = 1; } if( doreplace ) { { Ticket hostfile( &trustfile ); hostfile.ReplaceTicket( *peer, dummyuser, pubkey, e ); } if( !e->Test() ) { Ticket hostfile( &trustfile ); hostfile.DeleteTicket( *peer, altuser, e ); } return; } e->Set( trustkey.Length() ? MsgRpc::HostKeyMismatch : MsgRpc::HostKeyUnknown ); *e << *peer; *e << pubkey; } int Rpc::GetInfo( StrBuf *b ) { return transport->GetInfo( b ); } void RpcUtility::Generate(RpcUtilityType type, Error *e ) { # ifdef USE_SSL NetSslCredentials credentials; switch( type ) { case Generate_Credentials: credentials.GenerateCredentials( e ); break; case Generate_Fingerprint: credentials.ReadCredentials( e ); if ( !e->Test() ) { const StrPtr *fingerprint = credentials.GetFingerprint(); if( fingerprint ) printf("Fingerprint: %s\n", fingerprint->Text()); } break; } return; # else e->Set( MsgRpc::SslNoSsl ); # endif //USE_SSL }