/* * Copyright 1995, 2009 Perforce Software. All rights reserved. * * This file is part of Perforce - the FAST SCM System. */ /* * This is a client program to tail a server journal using * the export command. Those journal records can go to a file * and to a child process or stdout. Position state is maintained * in a file. Some care is taken to identify when the journal * is complete in the sense of no outstanding transactions. */ # define NEED_SLEEP # define NEED_FILE # include <clientapi.h> # include <debug.h> # include <error.h> # include <errorlog.h> # include <options.h> # include <runcmd.h> # include <i18napi.h> # include <enviro.h> # include <signaler.h> static ErrorId intervalerror = { ErrorOf( 0, 0, E_FATAL, 0, 0 ), "interval must be zero or positive" }; static ErrorId intervalneeded = { ErrorOf( 0, 0, E_FATAL, 0, 0 ), "Restart on error requires a positive interval" }; class ClientUserJournal : public ClientUser { public: ClientUserJournal() : outfd(-1), journalno(0), sequence(0), rc(NULL), errorseen(0), dataseen(0), rotateseen(0), restartOnError(0), savedjournal(NULL) {} ~ClientUserJournal() { delete rc; delete savedjournal; } void OutputStat( StrDict * ); void OutputError( const char * ); void SetStateFile( const StrPtr *, Error * ); void UpdateStateFile( Error *, const StrPtr * = NULL ); void SetOutputCommand( int ac, char **av ); void Output( const StrPtr &, Error *e ); void EndCommand(); int outfd; int journalno; offL_t sequence; int errorseen; int dataseen; int rotateseen; int restartOnError; StrBuf statefile; RunArgs rargs; RunCommand *rc; FileSys *savedjournal; }; void ClientUserJournal::Output( const StrPtr &dat, Error *e ) { if( savedjournal ) savedjournal->Write( dat.Text(), dat.Length(), e ); if( rc ) { if( outfd == -1 ) { int fds[2]; rc->RunChild( rargs, RCO_AS_SHELL|RCO_USE_STDOUT, fds, e ); if( e->Test() ) { errorseen = 1; return; } outfd = fds[1]; } int w = write( outfd, dat.Text(), dat.Length() ); if( w < 0 ) e->Sys("write", "pipe"); } else printf( "%.*s", static_cast<int>(dat.Length()), dat.Text() ); dataseen = 1; } void ClientUserJournal::EndCommand() { if( outfd >= 0 ) { close( outfd ); outfd = -1; if( rc->WaitChild() ) errorseen = 4; } } void ClientUserJournal::OutputStat( StrDict *vars ) { Error e; StrPtr *dat; if( errorseen ) return; StrPtr *pos = vars->GetVar( "pos" ); if( pos ) { // journal no should not change, but we'll // take it anyways journalno = pos->Atoi(); sequence = 0; const char *cp = pos->Contains( StrRef( "/" ) ); if( cp ) sequence = StrPtr::Atoi64( ++cp ); } dat = vars->GetVar( P4Tag::v_data ); if( dat && dat->Length() > 0 ) { StrPtr *complete = vars->GetVar( "complete" ); if( complete ) { int s = complete->Atoi(); Output( StrRef( dat->Text(), s ), &e ); if( pos ) UpdateStateFile( &e, pos ); if( s < dat->Length() ) Output( StrRef( dat->Text() + s, dat->Length() - s ), &e ); } else Output( *dat, &e ); if( e.Test() ) { AssertLog.Report( &e ); errorseen = 2; return; } } dat = vars->GetVar( P4Tag::v_counter ); if( dat ) { // skip to next journal journalno = dat->Atoi(); sequence = 0; // I believe that we must be at a complete and consistent point // for a journal rotation to occur so this should be safe... UpdateStateFile( &e ); rotateseen = 1; } } void ClientUserJournal::OutputError( const char *msg ) { ClientUser::OutputError( msg ); errorseen = 1; } void ClientUserJournal::SetStateFile( const StrPtr *fname, Error *e ) { statefile.Set( fname ); FileSys *f = FileSys::Create( FST_TEXT ); f->Set( statefile ); f->Open( FOM_READ, e ); if( !e->Test() ) { StrBuf buf; f->ReadWhole( &buf, e ); f->Close( e ); journalno = buf.Atoi(); const char *cp; if( cp = buf.Contains( StrRef( "/" ) ) ) sequence = StrPtr::Atoi64( ++cp ); } e->Clear(); delete f; } void ClientUserJournal::UpdateStateFile( Error *e, const StrPtr *pos ) { if( errorseen || !dataseen || !statefile.Length() ) return; FileSys *f = FileSys::Create( FST_TEXT ); f->Set( statefile ); FileSys *t = FileSys::Create( FST_TEXT ); t->MakeLocalTemp( f->Name() ); t->SetDeleteOnClose(); t->Open( FOM_WRITE, e ); if( e->Test() ) errorseen = 3; else { StrBuf buf; if( !pos ) { buf << journalno << "/" << StrNum( sequence ) << "\n"; pos = &buf; } t->Write( pos->Text(), pos->Length(), e ); t->Perms( FPM_RW ); t->Close( e ); if( !e->Test() ) t->Rename( f, e ); } delete t; delete f; } void ClientUserJournal::SetOutputCommand( int ac, char **av ) { // setup command invocations rargs.SetArgs( ac, av ); rc = new RunCommand; } void cntlc( void *v ) { ClientUserJournal *ui = (ClientUserJournal *)v; ui->errorseen = 1; ui->restartOnError = 0; } int jtail( int ac, char **av, Options &preopts, Options &opts, const char *progname ) { Error e; StrPtr *s; int ret; int interval = 2; int keepopen = 0; int exitOnRotate = 0; // Debugging for( int i = 0; s = preopts.GetValue( 'v', i ); i++ ) p4debug.SetLevel( s->Text() ); ClientUserJournal ui; if( opts[ 'k' ] ) keepopen = 1; if( opts[ 'x' ] ) exitOnRotate = 1; if( opts[ 'R' ] ) ui.restartOnError = 1; if( opts[ 'o' ] ) { ui.savedjournal = FileSys::Create( FST_ATEXT ); ui.savedjournal->Set( *opts[ 'o' ] ); ui.savedjournal->Open( FOM_WRITE, &e ); if( e.Test() ) { delete ui.savedjournal; ui.savedjournal = NULL; AssertLog.Report( &e ); return 1; } ui.savedjournal->Perms( FPM_RW ); } const StrPtr *statefile = opts[ 's' ]; if( statefile ) ui.SetStateFile( statefile, &e ); const StrPtr *jop = opts[ 'j' ]; if( jop ) { ui.journalno = jop->Atoi(); const char *cp; if( cp = jop->Contains( StrRef( "/" ) ) ) ui.sequence = StrPtr::Atoi64( ++cp ); if( ui.journalno < 0 ) ui.journalno = 0; if( ui.sequence < 0 ) ui.sequence = 0; } if( opts[ 'i' ] ) interval = opts[ 'i' ]->Atoi(); if( interval < 0 ) { e.Set( intervalerror ); AssertLog.Report( &e ); return 1; } if( ui.restartOnError && interval <= 0 ) { e.Set( intervalneeded ); AssertLog.Report( &e ); return 1; } if( ac > 0 ) ui.SetOutputCommand( ac, av ); const StrPtr *tableBlock = opts[ 'T' ]; const StrPtr *prefixop = opts[ 'J' ]; Signaler sig; sig.OnIntr( cntlc, (void *)&ui ); do { ClientApi client; Enviro enviro; const char *lc; enviro.Config( client.GetCwd() ); if( s = preopts[ 'C' ] ) lc = s->Text(); else lc = enviro.Get( "P4CHARSET" ); CharSetApi::CharSet cs = CharSetApi::NOCONV; if( lc ) { cs = CharSetApi::Lookup( lc ); if( (int)cs == -1 ) { // bad charset specification printf( "Character set must be one of:\n" "none, auto, utf8, utf8-bom, iso8859-1, shiftjis, eucjp, iso8859-15,\n" "iso8859-5, macosroman, winansi, koi8-r, cp949, cp1251,\n" "utf16, utf16-nobom, utf16le, utf16le-bom, utf16be,\n" "utf16be-bom, utf32, utf32-nobom, utf32le, utf32le-bom, utf32be,\n" "utf32be-bom, utf8unchecked, utf8unchecked-bom,\n" "cp936, cp950, cp1253, or iso8859-7.\n" "Check P4CHARSET and your '-C' option.\n" ); return 1; } else { CharSetApi::CharSet ws = cs; if( ( s = preopts[ 'Q' ] ) || ( lc = enviro.Get( "P4COMMANDCHARSET" ) ) ) { if( s ) lc = s->Text(); cs = CharSetApi::Lookup( lc ); if( (int)cs == -1 ) { printf( "P4COMMANDCHARSET unknown\n" ); return 1; } } if( CharSetApi::Granularity( cs ) != 1 ) { printf( "p4 can not support a wide charset unless\n" "P4COMMANDCHARSET is set to another charset.\n" ); return 1; } client.SetTrans( cs, ws, cs, cs ); } } if( s = preopts[ 'u' ] ) client.SetUser( s ); if( s = preopts[ 'p' ] ) client.SetPort( s ); if( s = preopts[ 'P' ] ) client.SetPassword( s ); e.Clear(); ui.errorseen = 0; client.Init( &e ); if( e.Test() ) { AssertLog.Report( &e ); if( ui.restartOnError ) { e.Clear(); fflush( stdout ); sleep( interval ); delete ui.savedjournal; ui.savedjournal = NULL; continue; } return 1; } client.SetProg( progname ); // if in unicode mode, remove translations to get unchanged // journal records if( cs != CharSetApi::NOCONV ) client.SetTrans( CharSetApi::UTF_8_UNCHECKED ); for( ;; ) { StrBuf buf; buf << ui.journalno << "/" << StrNum( ui.sequence ); client.SetVar( StrRef::Null(), StrRef( "-r" ) ); client.SetVar( StrRef::Null(), StrRef( "-j" ) ); client.SetVar( StrRef::Null(), buf ); if( prefixop ) { client.SetVar( StrRef::Null(), StrRef( "-J" ) ); client.SetVar( StrRef::Null(), *prefixop ); } if( tableBlock ) { client.SetVar( StrRef::Null(), StrRef( "-T" ) ); client.SetVar( StrRef::Null(), *tableBlock ); } client.Run( "export", &ui ); if( ui.errorseen || !keepopen || interval == 0 || ( exitOnRotate && ui.rotateseen ) ) ui.EndCommand(); if( ui.errorseen || e.Test() || interval == 0 || ( exitOnRotate && ui.rotateseen ) ) { if( ui.restartOnError && interval > 0 ) sleep( interval ); break; } // We should only update the state file on complete transactions // This used to happen here, but now only happens in OutputStat fflush( stdout ); sleep( interval ); } client.Final( &e ); if( e.Test() ) { ui.EndCommand(); AssertLog.Report( &e ); } ret = ui.errorseen; } while( ui.restartOnError && ! ( exitOnRotate && ui.rotateseen ) ); delete ui.savedjournal; ui.savedjournal = 0; return ret; } #ifdef BUILD_JTAIL static ErrorId usage = { ErrorOf( 0, 0, 0, 0, 0), "p4jtail [ -p port ][ -u user ][ -P password ][ -C charset ]\n" " [ -j token ][ -s statefile ][ -i interval ][ -k -x -R ]\n" " [ -J prefix ][ -T tables excluded ][ -o output ][ command ]\n" }; int main( int ac, char **av ) { AssertLog.SetTag( "p4jtail" ); Error e; Options opts; opts.Parse( --ac, ++av, "xkj:i:J:s:u:p:P:v:o:C:Q:T:R", OPT_ANY, usage, &e ); if( e.Test() ) { AssertLog.Report( &e ); return 1; } return jtail( ac, av, opts, opts, "p4jtail" ); } #endif