#include <stdio.h> #include <sys/stat.h> #include <time.h> #include <stdhdrs.h> #include <strbuf.h> #include <error.h> #include <errorlog.h> #include <options.h> #include "log.h" #include "usage.h" #include "fileio.h" #include "msgjrep.h" #include "position.h" #include "journal.h" #include "spawn.h" #include "pipe.h" #include "err.h" #define BUFSIZE 32768 bool ParseArgs( int *argc, char ***argv, Options *options, Error *e ) { (*argc)--; (*argv)++; options->Parse( *argc, *argv, "J:j:i:L:d:t:akh?V", OPT_ANY, MsgJrep::ShortUsage, e ); if( e->Test() ) return false; return true; } bool Init( int argc, char **argv, StrPtr *aval, StrPtr *Jval, StrPtr *ival, StrPtr *kval, StrPtr *dval, StrPtr *tval, Log *log, Journal *journal, Position *position, Pipe *pipe, Spawn *spawn, bool *keep, timespec *delay, timespec *atomicity, Error *e ) { int milliseconds; // atomicity delay is specified in milliseconds if( !journal->Init( Jval, e ) ) return false; if( !position->Init( journal->GetName(), ival, e ) ) return false; if( !pipe->Init( "p4jrep", e ) ) return false; if( !spawn->Init( argc, argv, !aval, log, e ) ) return false; if( !journal->Open( e ) ) return false; if( !position->Open( e ) ) return false; if( !journal->ReadSequence( e ) ) return false; if( !position->Read( e ) ) return false; if( *keep = kval ) { /* * Keep the pipe open. * * Open the pipe and spawn the command. */ if( !pipe->Open( e ) ) return false; if( !spawn->Exec( pipe->GetIfd(), e ) ) return false; } if( dval ) { /* * Delay between checks of the live journal specified. */ delay->tv_sec = dval->Atoi(); } else { /* * Default delay between checks of the live journal * is one second. */ delay->tv_sec = 1; } delay->tv_nsec = 0; // zero nanoseconds if( tval ) { /* * Delay for attempting to ensure atomicity specified. */ milliseconds = tval->Atoi(); } else { /* * Default delay for attempting to ensure atomicity * is ten milliseconds. */ milliseconds = 10; } atomicity->tv_sec = milliseconds / 1000; atomicity->tv_nsec = milliseconds % 1000 * 1000000; return true; } bool RotateCheck( Journal *journal, Position *position, bool *isrotated, Error *e ) { /* * Check for a rotated journal by comparing the current size of the * journal with the position last read from the journal. If the * current size is less than the position last read, then the * journal has been rotated. * * This check should be sufficient for most production environments. * There are edge cases that this check will not catch. For example, * if there is nothing written to the journal until just prior to * rotation, the file size of the rotated journal will be the same * before and after the journal rotation. * * It is certainly possible to do a more elaborate check, but at * the expense of performance. A more elaborate check might include * a check of the sequence number at the head of the journal, but this * will incur an additional I/O. */ /* * A journal might be rotated by 2007.3 and later servers by renaming * it rather than copying and truncating it. Detect the rename by * comparing the device and inode returned from statting by the * journal's file name and statting by the journal's file descriptor. */ dev_t dev; // device returned from statting by file name ino_t ino; // inode returned from statting by file name if( !journal->StatWait( e ) ) // wait for the journal to be created return false; dev = journal->GetDev(); ino = journal->GetIno(); if( !journal->Stat( e ) ) return false; if( *isrotated = dev != journal->GetDev() || ino != journal->GetIno() ) { /* * The journal has been rotated by renaming it. * * Close the old journal and open the new journal. */ if( !journal->Close( e ) ) return false; if( !journal->Open( e ) ) return false; } else { /* * Determine if the journal was rotated by * copying and truncating it. */ *isrotated = journal->GetSize() < position->GetOffset(); } return true; } bool StartReplicating( bool keep, Position *position, Pipe *pipe, Spawn *spawn, bool *replicating, char *buf, ssize_t n, Error *e ) { if( !*replicating ) { if( !keep ) { /* * Pipe is only open when journal entries are being replicated. * * Open the pipe and spawn the command. */ if( !pipe->Open( e ) ) return false; if( !spawn->Exec( pipe->GetIfd(), e ) ) return false; } *replicating = true; } if( !pipe->Write( buf, n, e ) ) return false; if( !position->Increment( n, e ) ) return false; return true; } bool FinishReplicating( bool keep, Position *position, Pipe *pipe, Spawn *spawn, bool *replicating, Error *e ) { if( !keep ) { /* * Pipe is only open when journal entries are being replicated. * * Close the pipe and wait for the spawned command to terminate. */ if( !pipe->Close( e ) ) return false; if( !spawn->Wait( e ) ) return false; } if( !position->Write( e ) ) return false; *replicating = false; return true; } bool RotateInit( StrPtr *jval, Journal *journal, Journal *rotated, Error *e ) { StrBuf name; if( jval ) { name.Set( jval ); name.Append( ".jnl" ); } else name.Set( journal->GetName() ); name << "." << journal->GetSequence(); if( !rotated->Init( &name, e ) ) return false; if( !rotated->Open( e ) ) return false; return true; } bool RotateRun( bool keep, Journal *rotated, Position *position, Pipe *pipe, Spawn *spawn, bool *replicating, Error *e ) { char buf[ BUFSIZE ]; ssize_t n; if( !rotated->Seek( position->GetOffset(), e ) ) return false; do { if( !rotated->Read( buf, BUFSIZE, &n, e ) ) return false; if( !n ) { /* * No more journal records in the rotated journal. */ if( *replicating ) { /* * Finish replicating the journal records that we have * already started replicating (which may have been * started in Run() prior to rotating the journal). */ if( !FinishReplicating( keep, position, pipe, spawn, replicating, e ) ) return false; } /* * Finished replicating the journal records * in the rotated journal. */ break; } if( !StartReplicating( keep, position, pipe, spawn, replicating, buf, n, e ) ) return false; } while( 1 ); return true; } bool RotateFinal( Journal *journal, Journal *rotated, Position *position, Error *e ) { if( !journal->ReadSequence( e ) ) return false; if( !rotated->Close( e ) ) return false; /* * Truncate the position file since the offsets that are about to be * written will likely have a fewer number of digits than the * offsets that were written prior to journal rotation. */ if( !position->Truncate( e ) ) return false; position->SetOffset( 0 ); return true; } bool Run( StrPtr *jval, bool keep, timespec delay, timespec atomicity, Journal *journal, Position *position, Pipe *pipe, Spawn *spawn, Error *e ) { char buf[ BUFSIZE ]; ssize_t n; bool full = false; // true iff read of journal filled buffer bool isrotated; Journal rotated; bool replicating = false; do { if( !journal->Seek( position->GetOffset(), e ) ) return false; do { /* * Read the journal without locking it. This assumes that the * (cached) inode update reflects the end of journal records as * they are written by the server. By not locking the journal, * a concurrency problem is not introduced. */ if( !journal->Read( buf, BUFSIZE, &n, e ) ) return false; if( full && !n ) { /* * Previous read of the journal filled the buffer, so * we immediately looped to read more of the outstanding * journal records. But nothing was read in the current * read of the journal, so we need to delay in an attempt * to ensure atomicity and then attempt another read. */ if( nanosleep ( &atomicity, NULL ) == -1 ) return syserr( "nanosleep", "empty atomicity", e ); if( !journal->Read( buf, BUFSIZE, &n, e ) ) return false; } full = n == BUFSIZE; // current read filled buffer? if( !n ) { /* * Nothing read from the journal. This could happen if * nothing has been written to the journal, or the journal * was just rotated and the read was attempted beyond the * end of the truncated journal. */ if( !RotateCheck( journal, position, &isrotated, e ) ) return false; if( isrotated ) break; if( replicating ) { /* * Since there are no more outstanding journal records, * finish replicating the journal records that we have * already started replicating. */ if( !FinishReplicating( keep, position, pipe, spawn, &replicating, e ) ) return false; } /* * Allow the target to do something while we * let more records collect in the journal. */ if( nanosleep ( &delay, NULL ) == -1 ) return syserr( "nanosleep", "delay", e ); /* * Loop to read perhaps a new set of * outstanding journal records. */ continue; } if( !StartReplicating( keep, position, pipe, spawn, &replicating, buf, n, e ) ) return false; if( full ) { /* * Current read of the journal filled the buffer, * so there are likely more outstanding journal * records to be read. Loop to read more of * the outstanding journal records. */ continue; } /* * The replication of all outstanding journal records has * been started. Since there is no way for us to know when * a transaction has been completely written to the journal * (the server locks the journal once for each journal entry * written, not once per transaction (and we don't want to * lock the journal, anyway)), we'll briefly delay here * in an attempt to ensure atomicity. If after this delay * there are no additional outstanding journal records, * then we'll finish the replication of what should be * an atomic transaction. */ if( nanosleep ( &atomicity, NULL ) == -1 ) return syserr( "nanosleep", "atomicity", e ); } while( 1 ); if( !RotateInit( jval, journal, &rotated, e ) ) return false; if( !RotateRun( keep, &rotated, position, pipe, spawn, &replicating, e ) ) return false; if( !RotateFinal( journal, &rotated, position, e ) ) return false; } while( 1 ); return true; } int Final( Journal *journal, Position *position, Error *e ) { if( !journal->Close( e ) ) return false; if( !position->Close( e ) ) return false; return true; } int main( int argc, char **argv ); int main( int argc, char **argv ) { Options options; StrPtr *Lval; // value of -L argument Log log; // log file into which messages are posted bool keep; // keep the pipe open timespec delay; // delay between checks of the live journal timespec atomicity; // delay for attempting to ensure atomicity Journal journal; Position position; Pipe pipe; Spawn spawn; Error e; if( !ParseArgs( &argc, &argv, &options, &e ) ) return printerr( &e ); if( options[ 'h' ] || options[ '?' ] ) { LongUsage(); return 0; } if( options[ 'V' ] ) { printf( "p4jrep Version 0.92 (beta)\n" ); return 0; } /* * Initialize log file separately so it can be used for * subsequent initializations. */ Lval = options[ 'L' ]; if( !log.Init( "p4jrep", Lval ? Lval->Text() : 0, &e ) ) return printerr( &e ); // report error and exit if( !Init( argc, argv, options[ 'a' ], options[ 'J' ], options[ 'i' ], options[ 'k' ], options[ 'd' ], options[ 't' ], &log, &journal, &position, &pipe, &spawn, &keep, &delay, &atomicity, &e ) ) log.Abort( &e ); // report errors and exit if( !Run( options[ 'j' ], keep, delay, atomicity, &journal, &position, &pipe, &spawn, &e ) ) log.Abort( &e ); // report errors and exit if( !Final( &journal, &position, &e ) ) log.Abort( &e ); // report errors and exit return 0; }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#7 | 7886 | Michael Shields |
Correctly handle 2010.2 journals, which have an @nx@ record as the first record in the journal after the journal has been rotated. |
||
#6 | 6439 | Michael Shields |
Updating for the 2008.1 p4d. The format of the @vv@ record changed in the 2008.1 p4d. The @vv@ record is the one and only journal record the format of which is important to p4jrep. This p4jrep can also be used with prior releases of p4d. Version string: p4jrep Version 0.91 (beta) |
||
#5 | 6155 | Michael Shields |
Updated for the 2007.3 release while maintaining compatibility with prior releases. 2007.3 and later servers might rotate the journal by renaming it rather than copying and truncating it. A renamed journal is now detected by comparing the device and inode returned from statting by the journal's file name and statting by the journal's file descriptor. This algorithm (suggested by J.T. Goldstone; thanks J.T.!) is faster than reopening the journal and seeking if the journal was not rotated (~2.0 seconds vs. ~2.7 seconds for 1,000,000 iterations on my laptop). |
||
#4 | 5379 | Michael Shields |
Fix regression introduced in version 0.86. The regression was a side-effect of funtionality added to keep the pipe open to the command executed by p4jrep. The regression caused the journal position to be erroneously updated when the forked process exited with a non-zero exit code. This could result in journal entries missed when restarting replication following a failure of the command executed by p4jrep. Credit (and my thanks) goes to Brian Moyers for finding and diagnosing the regression, and coding and testing the fix. |
||
#3 | 5119 | Michael Shields |
Added -t delay in an attempt to ensure transactional atomicity. If no additional journal entries are written during this delay, the transaction is assumed complete, which closes the pipe, which terminates the command (which if includes a p4d -jr, releases the locks in the target server, allowing commands access to the replicated atomic transaction). By default, this delay is ten milliseconds. Locking the journal does not ensure transactional atomicity since the server locks the journal once for each journal entry written, not once per transaction. And we would like to avoid locking the journal since that would introduce a potential concurrency problem. Not all operations in the server are atomic transactions and therefore cannot be replicated atomically. For example, updating a client's have list as files are being synced is not an atomic transaction. But committing a submit is an atomic transaction, and this change (with perhaps some site-specific tuning of the -t delay) attempts to replicate the commit atomically. |
||
#2 | 5012 | Michael Shields |
Added [-d <delay>] option to delay (in seconds) between checks for new entries in the live journal. By default, the delay between checks is one second. |
||
#1 | 4839 | Michael Shields | Pushing p4jrep source into the public depot. |