#!/usr/local/bin/perl
#
# This script replicates a local (secondary) server from a source
# (like a primary server).
# It is used in the Perforce replicate command to replay the source journal
# on the local server.
# The Perforce replicate command should be spelled like this:
#   p4 -p MASTER:PORT replicate
#      -s STATE          # local file to track the most recent journal position
#      -J PREFIX                 # journal prefix used on the master
#      -i 0                      # optionally to disable polling
#      PATH/p4admin_replicate    # this script
#      -v                        # verbose mode
#      -port NNNN                # port of the local replica
#      -srchost MASTER           # master host name where ,v files live
#      -srctop DIR               # directory where ,v files live on the master
#      -log                      # log everything (that's the default anyway)
#
# Example:
#   p4 -p perforce-00-blr:1666 replicate
#      -s /export/journal/perforce/1666/replica.state
#      -J /export/journal/perforce/1666/journal
#      -i 0
#      /export/db/perforce/1666/admin.support/p4admin_replicate
#      -v
#      -port 1666
#      -srchost perforce-00-blr
#      -srctop /export/data/perforce/1666
#      -log
#
# Note: the current code expects the pipe from "replicate" to be closed
# eventually. Only then rsync will copy archive files.  This way, it is
# easier to synchronize (and possibly minimize) rsync activity.
# This means that the -k replicate option is not used.
#
# Michael Mirman
# MathWorks, Inc. 2010-2011

use strict;
use warnings;

use Cwd            qw(abs_path);
use File::Basename qw(basename dirname fileparse);
use Getopt::Long   qw(GetOptions);
use Parallel::ForkManager;
use Pod::Usage     qw(pod2usage);

our ($Mydir, $Myname);
BEGIN {
    ($Myname, $Mydir) = fileparse($0);
    $Mydir  = abs_path($Mydir);
    unshift @INC, $Mydir, dirname($Mydir) . '/lib';
}

use MW::Util::Mail qw(sendmsg);
use p4admin_backup; # settings, P4LOGDIR, msg

# Global variables shared with or defined in p4admin_backup.pm
our $HOST = hostname;
our ($logfile, $preview_only);
our $notify = $HOST =~ /^ perforce /x
            ? 'p4-help@mathworks.com'
            : $HOST =~ /^ scmtest /x
            ? 'perforce-admin@mathworks.com'
            : '';
our $verbose = 0; # this is the same $verbose as in p4admin_backup.pm

my @orig_args = @ARGV;

my $port; # host:port specifies the target Perforce server
my ($repeat_interval, $srchost, $srctop);
$logfile = 1;   # default: create log file
my $nproc = 10; # default max number of parallel rsync processes
my @target_dirs;  # srchost:srctop will be mirrored to these directories

#
# Parse arguments
GetOptions(
           'data2=s'      => \@target_dirs,
	   'help'         => sub { pod2usage( -verbose => 2, -exit => 0 ) },
	   'log!'         => \$logfile,    # log file to redirect the output
           'mail=s'       => \$notify,     # email result there
           'n'            => \$preview_only,
	   'nproc=i'      => \$nproc,
           'port=s'       => \$port,  # port of the local replica
	   'repeat=i'        # run continuously, repeating itself no more
	                     # frequently than this number of minutes
	                  => sub { $repeat_interval = $_[1] * 60 },
           'srchost=s'    => \$srchost,
           'srctop=s'     => \$srctop,
           'v+'           => \$verbose,
          )
    or die "$Myname: Error parsing arguments\n";

if ( ! $port ) {
    print "No local port specified.\n";
    pod2usage( -verbose => 2,
	       -exitval => 2 );
}

if ( ! $srchost ) {
    print "No source host specified.\n";
    pod2usage( -verbose => 2,
	       -exitval => 2 );
}
if ( ! $srctop ) {
    print "No source top directory specified.\n";
    pod2usage( -verbose => 2,
	       -exitval => 2 );
}

(my $prefix = $Myname) =~ s/p4admin_//;
p4admin_begin($port, "$prefix.log");

$SIG{TERM} = $SIG{HUP} = $SIG{QUIT} = $SIG{INT} = sub {
    my ($sig) = @_;
    msg(0, "Signal $sig received.\n", Carp::longmess(), "\n");
    if ( $sig eq 'INT' ) {
        my $msg = "Replication was interrupted by a $sig signal.\n"
            . "You must verify the integrity of the archive\n"
            . "\n--$Myname on behalf of the Perforce Administrator\n";
        if ( $notify ) {
            sendmsg({Subject => "Replication interrupted on $HOST",
                     Message => $msg,
                     To      => $notify,
                     verbose => $verbose,
                    });
        }
        msg(0, "You must verify the integrity of the archive\n");
        exit 9;
    }

    msg(0, "We need to finish this batch of changes\n");
};

#
# Replicate
# p4 [ -j token ][ -s statefile ][ -i interval ][ -k -x ]
#    [ -J prefix ][ -o output ][ command ]
#
# -j token
#	Specify a journal number or position token of the form journalnum/byteoffset from which to start replicating metadata. If this flag is specified, it overrides any state file specification.
# -s statefile
#	Specify a state file which tracks the most recent journal position.
# -i interval
#	Specify a polling interval, in seconds. The default is two seconds. To disable polling (that is, to check once for updated journal entries and then exit), specify an interval of 0.
# -J prefix
#	Specifies a filename prefix for the journal, such as that used with p4d -jc prefix
# -k
#	Keep the pipe to the command subprocess open between polling intervals.
# -x
#	Exit the p4 replicate command when journal rotation is detected.
# -o savefile
#	Specify a file for output. If a command subprocess is specified, both the subprocess and the specified savefile are provided with the output.

# "command" in p4 replicate above is *this* script, so we are reading
# journal records from STDIN and should pass them to p4d -r ROOT -b 1 -jrc -

my $p4     = P4($port);
my $p4d    = P4D($port);
my $logdir = P4LOGDIR($port);
my $statefile = "$logdir/replica.state";
my $RSYNC = '/usr/bin/rsync';

# From p4d reference on
# http://www.perforce.com/perforce/doc.092/manuals/p4sag/aa_p4d.html#1043673:
#
# -b bunch -jr file
#	Read bunch lines of journal records, sorting and removing duplicates before updating the database. The default is 5000, but can be set to 1 to force serial processing. This combination of flags is intended for use with by replica servers started with the p4 replicate command.
#
# -jrc file
#	Journal-restore with integrity-checking. Because this option locks the database, this option is intended only for use by replica servers started with the p4 replicate command.

#
# See
# http://www.perforce.com/perforce/doc.current/manuals/p4sag/10_replication.html
# for the detailed explanation of this command.
# We'll pipe all the records there, and p4d will then terminate.
my $replicate = "$p4d -r " . P4ROOT($port) . " -f -jrc -";

unshift @target_dirs, P4DATA($port);

open my $OUT, "| $replicate"
    or die "Cannot open pipe to $replicate: $!\n";

my %need2copy;
while ( <STDIN> ) {
    msg(2, "READ: $_");
    print $OUT $_;

#
# Ref: http://www.perforce.com/perforce/doc.current/schema/index.html
# We are interested in db.rev records because they determine
# what archive files we need to copy
#
# Example:
# @pv@ 8 @db.rev@ @//sandbox/mmirman/doc/new/foo.2@ 1 0 3 3 1277910491 1277910483 F78464B431A46634558D8B20219B729A 17 0 1 @//sandbox/mmirman/doc/foo@ @1.2@ 0

    my ($record_type, $depotFile, $depotRev, $type, $action, $change, $date,
        $modTime, $digest, $size, $traitLot, $lbrIsLazy)
        = m{ ^ \@.v\@ \s \d+ \s \@(db\.rev\w*)\@
             \s \@ //([^@]+) \@ # The file name
             \s (\S+)           # The revision number
             \s (\S+)           # The file type of the revision
             \s (\S+)           # The action that created the revision
             \s (\S+)           # The changelist that created the revision
             \s (\S+)           # The date/time the changelist that created
                                # the revision was submitted
             \s (\S+)           # The timestamp on the file in the user's
                                # workspace when the revision was submitted
             \s (\S+)           # The MD5 digest of the revision
             \s (\S+)           # The size of the file in bytes
             \s (\S+)           # Group of traits (attributes) associated
                                # with the revision.
             \s (\S+)           # Flag specifying whether or not the revision
                                # gets its content from another file (i.e.
                                # whether or not depotFile and lbrFile differ)
             \s
           }smx
        or next;

    msg(1, "READ: $_")
        if $verbose < 2;  # dont repeat the same record twice

# From http://www.perforce.com/perforce/r10.1/schema:
# db.revcx Secondary index of db.rev
# db.revdx Revision records for revisions deleted at the head revision.
# db.revhx Revision records for revisions NOT deleted at the head revision
# db.revpx Pending revision records.

# exclude db.revcx, db.revdx, db.revhx, db.revpx records
    if ( $record_type =~ m{^db.rev[cdhp]x$} ) { # ,v files didn't change
        msg(0, "Record $record_type can be skipped\n");
        next;
    }

# exclude lazy copies in integrations (when lbrIsLazy is true in db.rev)
# exclude those that don't exist in the source -
# they were lazy copies and then got deleted
    if ( $lbrIsLazy || ( ($digest =~ /^0+$/) && ($size < 0) ) ) {
        msg(0, "Lazy copy does not require an rsync\n");
        next;
    }

    my $depot_dir = dirname($depotFile);
    $need2copy{$depot_dir} ||= $record_type;
} # read STDIN to the end

#
# Hypothetically speaking, we could figure out whether we deal with
# binary files, in which case we need to copy *,d subdirectories.
# It's too much trouble for the wrong decision here.
# So, we will copy *recursively* in every case.
# Therefore, we need to exclude those directories that are subdirectories
# of those we will copy anyway.
#

my $copy_limit = 15; # max number of trying to mirror
my $JUST_A_FEW = 1;  # bat/branch/foo will be mirrored, but bat/branch won't
my @failed2copy;
while ( %need2copy && ( $copy_limit-- >= 0 ) ) {
    # copy_all does the actual copy and removes from %need2copy
    # all directories we successfully copied (or didn't have to copy)
    copy_all(\%need2copy);

    # If we fail to mirror something, try to mirror its parent
    for my $dir ( sort keys %need2copy ) {
        my $parent = dirname($dir);
        if ( ($parent =~ tr{/}{}) > $JUST_A_FEW ) {
            msg(0, "We will try to copy $parent",
                " since we could not copy one of its subdirectories\n");
            $need2copy{$parent} = 'try-again';
        }
        else {
            msg(0, "Parent directory of $dir",
                " is too short to try to mirror it\n");
            push @failed2copy, $dir;
            delete $need2copy{$dir};
        }
    }
}
@failed2copy = sort { $a cmp $b } (@failed2copy, keys %need2copy);

if ( @failed2copy ) {
    my $msg = join '', "Failed to copy the following directories:\n",
                   (map { "   $_\n" } @failed2copy),
         "This requires an investigation and fixing the data.\n",
         "If\n    p4 -p $HOST:$port verify\n",
         "starts failing, the data on this replica will become unreliable.\n",
         "\n--$Myname on behalf of the Perforce Administrator\n";

    if ( $notify ) {
        sendmsg({Subject => "Failure in replication on $HOST",
                 Message => $msg,
                 To      => $notify,
                 verbose => $verbose,
                });
    }
    else {
        print "NOT sending the following message to perforce-admin:\n", $msg;
    }
}

my $rc = @failed2copy;
msg(0, "Finished replaying journal for $port. Errors: $rc\n");

p4admin_end($repeat_interval, $port);
exit $rc;

#
#  Mirror given directories.
#  Returns the number of directories we failed to copy.
#  If it returns 0, we are happy.
#
sub copy_all {
    my ($need2copy) = @_;
    my @need2copy = sort keys %{ $need2copy };
SKIP_SUBDIR:
    for ( my $i=$#need2copy; $i >= 1; $i-- ) {
        for ( my $j=0; $j < $i; $j++ ) {
            if ( $need2copy[$i]
                 =~ m{ ^ \Q$need2copy[$j]\E  # $j is a parent of $i
                       /. }smx ) {
                msg(0, "$need2copy[$i] does not have to be copied separately:",
                    " it is a subdirectory of $need2copy[$j]\n");
                delete $need2copy->{$need2copy[$i]};
                next SKIP_SUBDIR;
            }
        }
    }

    my $rsync_err = 0;
    my $pm = Parallel::ForkManager->new($nproc);
    $pm->run_on_finish(sub {
                           my ($pid, $code, $dir, $sig) = @_;
                           msg(0,
                       "Process $pid finished copying $dir with code $code\n");
                           if ( $code ) {
                               $rsync_err++;
                           }
                           else {
                               msg(0, "(pid=$$)",
                                   " We won't try to copy $dir any more\n");
                               delete $need2copy->{$dir};
                           }
                       });

    for my $depot_dir ( sort keys %{ $need2copy } ) {
        my $msg_prefix = "$need2copy->{$depot_dir}: Mirroring $depot_dir";

        for my $target_dir ( @target_dirs ) {
            my $msg_target = (@target_dirs > 1 ? " to $target_dir" : '');
            if ( my $pid = $pm->start($depot_dir) ) { # this does the fork.
                msg(0, "$msg_prefix$msg_target in process $pid\n");
                next;
            }

            # This is a child process
            my $rc = copy_one($depot_dir, $target_dir);
            msg(0, $rc ? "All attempts to copy $depot_dir failed (code=$rc)\n"
                       : "Copying $depot_dir$msg_target succeeded in pid=$$\n");
            $pm->finish($rc); # Terminates the child process
        } # for each target directory
    } # for each directory we need to copy
    # Unclear how critical is to notify $notify about this kind of error.
    # We probably need to collect some statistics about any fall-out's from
    # these errors. - 8/3/2010, MM
    close $OUT
        or msg(0, $! ? "Syserr closing pipe to $replicate:\n    $!\n"
                     : "ERROR running $replicate (exit code: "
                       . ( $? > 255 ? $?>>8 : $? ) . ")\n");

    msg(0, "Waiting for completion...\n");
    $pm->wait_all_children;
    msg(0, "Total number of errors from this copying: $rsync_err\n")
        if $rsync_err;
    return $rsync_err;
} # copy_all

# Copy one directory (this happens in a child process)
sub copy_one {
    my ($depot_dir, $target_dir) = @_;

    # Replace the SIG defined in p4admin_backup.pm, so we would not send
    # email from every thread.
    $SIG{__DIE__} = sub {
        msg(0, "$Myname: Child Process $$ was terminated:\n", @_);
        exit 13;
    };

    # In order to use relative paths, we need to be in the right
    # directories on both machines
    chdir $target_dir
        or die "Unexpected failure to chdir to $target_dir",
        " in the child process $$";
    -d $depot_dir
        or mkpath $depot_dir; # or croak

    (my $dir_nospace = $depot_dir) =~ s/ /\\ /g;
    my $cmd = $RSYNC
        # - copy files recursing the directory (dont try to be too
        # smart to select specific files);
        # - preserve modification times;
        # - verbose mode;
        # - quote arguments in case of there are spaces;
        # - use --delete because subdirectories and files related
        #   to deleted shelved changes go away and we need to mirror
        #   that (g659635).
        . " -av --delete '$srchost:$srctop/$dir_nospace/' $dir_nospace/";

    # rsync sometimes fails. Make three attempts not counting those
    # where we can identify problems as those we can ignore.
    my $limit = 3;

    msg(0, "Copying: $cmd\n");
    my $i = 0;
    my $rc;
    while ( $i <= $limit ) {

        # Sometimes, we mirror gecks very quickly (and frequently).
        # This may cause temp files to come and vanish while we are running
        # rsync.
        # If we see messages like
        # file has vanished: "/export/data/perforce/1666/meta/job/tmp.16119.207"
        # or
        # ssh_exchange_identification: Connection closed by remote host
        # we should re-try rsyncing.
        my $known_error = 0;

        open my $PIPE, '-|', "$cmd 2>&1"
            or die "Cannot start pipe to '$cmd': $!";
        while ( <$PIPE> ) {
            print;
            $known_error = 1
                if m{^file has vanished: \S+/tmp[.\d]+.?$}
                || m{ssh_exchange_identification: Connection closed};
        }
        close $PIPE;
        $rc = $? > 255 ? $?>>8 : $?;
        if ( $rc == 0 ) {
            last;
        }
        $i++;
        msg(0, "Attempt $i. Copying of $depot_dir failed in pid=",
            "$$ with code $rc\n");
        if ( $known_error # "known" errors don't $limit us, but we'll be
             && # reasonable and won't make more than 100 attempts anyway
             $limit < 100 ) {
            $limit++;
        }
        # a small delay is helpful if there is a temporary network problem
        sleep 1;
    }
    return $rc;
} # copy_one

=head1 NAME

 p4admin_replicate

=head1 SYNOPSIS

 p4admin_replicate -help
 p4admin_replicate -port NNNN -backup host:port

=head1 DESCRIPTION

 Replicate primary server on a local server.

Options:

   -data2 dir     additional target directory to mirror to
                  (the source is always srchost:srctop)
                  (one target directory is always the data directory of
                   the local replica determined by -port)
   -log           redirect the output to automatically created log (default)
   -nolog         send all output to stdout
   -mail user     send email with the result to the specified user
   -n             preview: shows what would be done
   -nproc N       syncing of the data files can be in N parallel processes
   -port NNNN     this local server is a replica of the source
   -srchost host  mirror data from this host
   -srctop dir    data on srchost to mirror are in this directory
   -v             verbose mode
   -v -v          even more verbose

=head1 EXAMPLES

Continuously replicate primary server perforce:1666 on the local host 1777:

  p4 -p perforce:1666 replicate -s DIR1/replicate.state DIR2/p4admin_replicate -v -port 1777

=cut