#!/bin/env python3 ''' Perforce Baseline and Branch Import Tool Test Suite Copyright (c) 2012 Perforce Software, Inc. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THIS SOFTWARE IS NOT OFFICIALLY SUPPORTED. Please send any feedback to consulting@perforce.com. Test driver for BBI. Run with '-h' for arguments. - p4 and p4d must be available in path - Must be run from the BBI test directory ''' # imports import sys import os import re import platform from optparse import OptionParser from subprocess import * import shutil import stat import tarfile import tempfile import time # global vars P4USER="" P4PORT="" P4CLIENT="" verbosity="2" mode = "0" p4="" p4Raw="" p4cfg = "" bbicfg = "" bbipy = "" bbipy_validator = "" topdir = "" P4ROOT= "" WSDIR= "" p4dProc = None testresults = {} # parse options def parseoptions(): global mode global verbosity usage = "usage: %prog [options]" parser = OptionParser(usage) parser.add_option("-m", "--mode", dest="mode", help="Mode: 0 = no validation, 1 = check file metadata, 2 = check file contents.") parser.add_option("-v", "--verbosity", dest="level", help="Specify the verbosity level, from 1-3. 1 is quiet mode;" " only error and test output is displayed. 2 is normal verbosity; some messages" " displayed. 3 is debug-level verbosity. The default for this program is 3.") (options, args) = parser.parse_args() if options.mode: try: if -1 < int(options.mode) < 3: mode = options.mode print("Mode level set to: {0}.".format(options.mode)) else: sys.exit(1) except: log("ERROR", "Mode level must be an integer value of 0, 1 or 2.") if options.level: try: if 0 < int(options.level) < 4: verbosity = options.level print("Verbosity level set to: {0}.".format(options.level)) else: sys.exit(1) except: log("ERROR", "Verbosity level must be an integer value of 1, 2 or 3.") # log def log(msglevel="DEBUG", message=""): if msglevel == "TEST": print("Running in test mode. Command run would have been:\n", message) elif msglevel == "ERROR": print(message) sys.exit(1) elif (verbosity == "3"): print(message) elif (verbosity == "2" and msglevel == "INFO"): print(message) # run OS cmd def runcmd(cmd): try: log("DEBUG", cmd); pipe = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, universal_newlines=True) stdout, stderr = pipe.communicate() log("DEBUG", stdout) if pipe.returncode != 0: log("INFO", "{0} generated the following output: {1}".format(cmd, stdout)) log("ERROR", "{0} generated the following error: {1}".format(cmd, stderr)) else: return stdout except OSError as err: log("ERROR", "Execution failed: {0}".format(err)) # run P4 cmd in tagged mode (use -s) def runp4cmd(cmd, instr=None): try: log("DEBUG", cmd); pipe = Popen(p4 + cmd, shell=True, stdin=PIPE, stdout=PIPE, universal_newlines=True) stdout, stderr = pipe.communicate(instr) log("DEBUG", stdout) if re.search(r"^exit: 0", stdout, re.M): return stdout else: log("ERROR", "{0}{1} generated the following error: {2}".format(p4, cmd, stdout)) except OSError as err: log("ERROR", "Execution failed: {0}".format(err)) # run P4 cmd without tagged output def runp4cmdRaw(cmd, instr=None): try: log("DEBUG", cmd); pipe = Popen(p4Raw + cmd, shell=True, stdin=PIPE, stdout=PIPE, universal_newlines=True) stdout, stderr = pipe.communicate(instr) log("DEBUG", stdout) if pipe.returncode != 0: log("ERROR", "{0}{1} generated the following error: {2}".format(p4Raw, cmd, stdout)) else: return stdout except OSError as err: log("ERROR", "Execution failed: {0}".format(err)) # delete tree def rmtree(treedir): try: for root, dirs, names in os.walk(treedir): for name in names: os.chmod(os.path.join(root,name), stat.S_IWRITE) shutil.rmtree(treedir) except OSError as e: log("ERROR", "Could not remove {0}.\nThe error was:\n{1}".format(treedir, e)) # start P4D def setupServer(): global p4dProc os.mkdir(P4ROOT) runcmd("p4d -r {0} -jr {1}".format(P4ROOT, os.path.join(os.getcwd(), "checkpoint"))) if (not os.path.exists ("depot.tar") and os.path.exists("depot.tar.gz")): log("INFO", "Extracting depot.tar from depot.tar.gz") os.system("gunzip depot.tar.gz") tar = tarfile.open("depot.tar") tar.extractall(path=P4ROOT) tar.close() p4dProc = Popen(["p4d", "-d", "-r", P4ROOT, "-p", P4PORT, "-J", "off", "-L", "p4d.log", "-A", "audit.log"]) log("INFO", "p4d launched with pid {0}".format(p4dProc.pid)) # make workspace def setupWorkspace(): os.mkdir(WSDIR) cwd = os.getcwd() os.chdir(WSDIR) client = runp4cmdRaw("client -o") runp4cmd("client -i", client.encode("latin-1")) cfgfile = open("p4config.txt", "w") cfgfile.write("P4PORT={0}\n".format(P4PORT)) cfgfile.write("P4USER={0}\n".format(P4USER)) cfgfile.write("P4CLIENT={0}\n".format(P4CLIENT)) cfgfile.close() os.chdir(cwd) # delete server def cleanup(): runp4cmd("admin stop") p4dProc.wait() log("DEBUG", "Waiting for P4D to shutdown...") time.sleep(3) rmtree(P4ROOT) rmtree(WSDIR) rmtree(topdir) # validates against expected data and (optionally) runs the validator def validate(testname, rjd=False): diffs = runp4cmd("diff2 -q \"//depot/test/{0}/...\" \"//depot/expected/{1}/...\"".format(testname, testname)) tstresult = "" if re.search(r"^info:", diffs, re.M): log("INFO", "\tFAIL:") log("INFO", "\t\t" + diffs) tstresult = "FAIL" else: log("INFO", "\tSUCCESS") tstresult = "SUCCESS" if mode != "0": if rjd: validation = runcmd("python {0} -v {1} -r rjd -m {2} {3} {4}".format(bbipy_validator, verbosity, mode, p4cfg, bbicfg)) else: validation = runcmd("python {0} -v {1} -m {2} {3} {4}".format(bbipy_validator, verbosity, mode, p4cfg, bbicfg)) log("INFO", "\tValidator says: {0}".format(validation)) if re.search(r"^\s*DIFF", validation, re.M): tstresult = "FAIL" return tstresult def runtest(testname, bbicfglines, rjd=False): global testresults log("INFO", "---------------------------------------------------") log("INFO", "TEST: " + testname) log("INFO", "\n") bbicfg_f = open(bbicfg, "w") for cfgline in bbicfglines: bbicfg_f.write(cfgline) bbicfg_f.close() bbiout = '' if rjd: bbiout = runcmd("python {0} -v {1} -r rjd {2} {3}".format(bbipy, verbosity, p4cfg, bbicfg)) else: bbiout = runcmd("python {0} -v {1} {2} {3}".format(bbipy, verbosity, p4cfg, bbicfg)) log("INFO", "\tRan BBI") tstresult = validate(testname, rjd) os.unlink(bbicfg) testresults[testname] = tstresult return bbiout # test basic snapshot import def testUpdate(): testname = "update" cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)] runtest(testname, cfglines) # test renaming def testRename(): testname = "rename" cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")), "UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname), "RENAME|//depot/test/{0}_orig/...|//depot/test/{0}/...|Rename FGS-1.0 for {1}\n".format(testname,testname)] runtest(testname, cfglines) # test snapshot import with empty directory (do not populate) def testUpdateEmptyNoPopulate(): testname = "empty_no_populate" cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0-DEV.tar.gz")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)] runtest(testname, cfglines) # test snapshot import with empty directory (populate) def testUpdateEmptyPopulate(): testname = "empty_populate" cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0-DEV.tar.gz")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|EMPTYDIRS\n".format(testname,testname)] runtest(testname, cfglines) # test snapshot import with symlinks (preserve) def testUpdateLinksPreserve(): testname = "links_preserve" cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.1-DEV")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|SYMLINKS\n".format(testname,testname)] runtest(testname, cfglines) # test snapshot import with symlinks (do not preserve) def testUpdateLinksNoPreserve(): testname = "links_no_preserve" cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.1-DEV")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)] runtest(testname, cfglines) # test branch def testBranch(): testname = "branch" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("BRANCH|//depot/test/{0}_orig/...|//depot/test/{0}/...|Branch FGS-1.0 for {1}\n".format(testname,testname)) runtest(testname, cfglines) # test branch def testBranchWithSpec(): global testresults testname = "branch_with_spec" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("BRANCH|//depot/test/{0}_orig/...|//depot/test/{0}/...|Branch FGS-1.0 for {1}|BRANCHSPEC={2}\n".format(testname,testname,testname)) runtest(testname, cfglines) branches = runp4cmdRaw("branches") pattern = r"""^Branch %s\s+""" regexp = re.compile(pattern % testname, re.M) if re.search(regexp, branches) == None: log("INFO", "\tNo branch spec named {0}".format(testname)) testresults[testname] = "FAIL" # test basic snapshot import with NODELETE option def testUpdateNoDelete(): testname = "update_no_delete" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0"))) cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1"))) cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}/...|Import FGS-1.1 for {1}\n".format(testname,testname)) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|NODELETE\n".format(testname,testname)) runtest(testname, cfglines) # test wildcards (allow) def testWildcardsAllow(): testname = "wildcards_allow" cfglines = [] cfglines.append("BASELINE|FGS-0.2-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.2-DEV"))) cfglines.append("UPDATE|FGS-0.2-DEV|//depot/test/{0}/...|Import FGS-0.2 for {1}|ALLOWWILDCARDS\n".format(testname,testname)) runtest(testname, cfglines) # test basic snapshot import using a command def testUpdateCmd(): testname = "update_cmd" cfglines = [] if platform.system() == "Windows": cfglines.append("BASELINE|FGS-1.0|CMD:copy {0} .\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0", "src", "makefile"))) else: cfglines.append("BASELINE|FGS-1.0|CMD:cp {0} .\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0", "src", "makefile"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)) runtest(testname, cfglines) # test basic snapshot import with 'detect case' flag def testCaseDetect(): global testresults testname = "case_detect" cfglines = [] cfglines.append("BASELINE|FGS-0.3-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.3-DEV.tar"))) cfglines.append("UPDATE|FGS-0.3-DEV|//depot/test/{0}/...|Import FGS-0.3 for {1}\n".format(testname,testname)) log("INFO", "---------------------------------------------------") log("INFO", "TEST: " + testname) log("INFO", "\n") bbicfg_f = open(bbicfg, "w") for cfgline in cfglines: bbicfg_f.write(cfgline) bbicfg_f.close() cmdout = runcmd("python {0} -v {1} -c 1 {2} {3}".format(bbipy, verbosity, p4cfg, bbicfg)) log("INFO", "\tRan BBI") os.unlink(bbicfg) if re.search(r"^Potential case sensitivity conflict", cmdout, re.M): log("INFO", "\tSUCCESS") testresults[testname] = "SUCCESS" else: log("INFO", "\tFAIL:") log("INFO", "\t\t" + cmdout) testresults[testname] = "FAIL" # test rmtop def testRmtop(): testname = "rmtop" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0.tar.gz"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|RMTOP\n".format(testname,testname)) runtest(testname, cfglines) # test junk (no remove) def testJunkNoRemove(): testname = "junk_no_remove" cfglines = [] cfglines.append("BASELINE|FGS-0.4-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.4-DEV"))) cfglines.append("UPDATE|FGS-0.4-DEV|//depot/test/{0}/...|Import FGS-0.4-DEV for {1}\n".format(testname,testname)) runtest(testname, cfglines, rjd=False) # test junk (remove) def testJunkRemove(): testname = "junk_remove" cfglines = [] cfglines.append("BASELINE|FGS-0.4-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.4-DEV"))) cfglines.append("UPDATE|FGS-0.4-DEV|//depot/test/{0}/...|Import FGS-0.4-DEV for {1}\n".format(testname,testname)) runtest(testname, cfglines, rjd=True) # test 'record as merged' def testRecordMerge(): testname = "record_merge" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0"))) cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("BRANCH|//depot/test/{0}_orig/...|//depot/test/{0}/...|Branch FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}_orig/...|Update FGS-1.1 for {1}\n".format(testname,testname)) cfglines.append("RECORD_AS_MERGED|//depot/test/{0}_orig/...|//depot/test/{0}/...|Record merge for {1}\n".format(testname,testname)) runtest(testname, cfglines) # test import zip file def testUpdateZip(): testname = "update_zip" cfglines = [] cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1.zip"))) cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}/...|Import FGS-1.1 for {1}|RMTOP\n".format(testname,testname)) runtest(testname, cfglines) # test 'copy merge' def testCopyMerge(): testname = "copy_merge" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0"))) cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("BRANCH|//depot/test/{0}/...|//depot/test/{0}_dev/...|Branch FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}_dev/...|Update FGS-1.1 for {1}\n".format(testname,testname)) cfglines.append("COPY_MERGE|//depot/test/{0}_dev/...|//depot/test/{0}/...|Record merge for {1}\n".format(testname,testname)) runtest(testname, cfglines) # test 'copy merge' with full copy def testCopyMergeFull(): testname = "copy_merge_full" cfglines = [] cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0"))) cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1"))) cfglines.append("BASELINE|FGS-1.1.p1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1.p1"))) cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("BRANCH|//depot/test/{0}/...|//depot/test/{0}_dev/...|Branch FGS-1.0 for {1}\n".format(testname,testname)) cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}_dev/...|Update FGS-1.1 for {1}\n".format(testname,testname)) cfglines.append("RECORD_AS_MERGED|//depot/test/{0}_dev/...|//depot/test/{0}/...|Record merge for {1}\n".format(testname,testname)) cfglines.append("UPDATE|FGS-1.1.p1|//depot/test/{0}/...|Update FGS-1.1.p1 for {1}\n".format(testname,testname)) cfglines.append("COPY_MERGE|//depot/test/{0}_dev/...|//depot/test/{0}/...|Copy merge for {1}|FULLCOPY\n".format(testname,testname)) runtest(testname, cfglines) # set global variables def initVars(): global P4USER global P4PORT global P4CLIENT global verbosity global mode global p4 global p4Raw global p4cfg global bbicfg global bbipy global bbipy_validator global topdir global P4ROOT global WSDIR P4USER="import_user" P4PORT="127.0.0.1:9000" P4CLIENT="import_ws" verbosity="2" mode = "0" # make working directories topdir = tempfile.mkdtemp(dir=os.getcwd()) P4ROOT=os.path.join(topdir, "p4root") WSDIR=os.path.join(topdir, "ws") # set P4 and config if platform.system() == "Windows": p4="p4.exe" else: p4="p4" p4Raw = "{0} -p {1} -c {2} -u {3} ".format(p4, P4PORT, P4CLIENT, P4USER) p4 = "{0} -p {1} -c {2} -u {3} -s ".format(p4, P4PORT, P4CLIENT, P4USER) p4cfg = os.path.join(topdir, "p4cfg.txt") p4cfg_f = open(p4cfg, "w") p4cfg_f.write("P4PORT=" + P4PORT + "\n") p4cfg_f.write("P4CLIENT=" + P4CLIENT + "\n") p4cfg_f.write("P4USER=" + P4USER + "\n") p4cfg_f.close() bbicfg = os.path.join(topdir, "bbi.cfg") bbipy = os.path.join(os.getcwd(), "..", "p4bbi.py") bbipy_validator = os.path.join(os.getcwd(), "..", "p4bbi_validate.py") # main routine if __name__ == "__main__": initVars() parseoptions() setupServer() setupWorkspace() testUpdate() testUpdateEmptyPopulate() testUpdateEmptyNoPopulate() testUpdateLinksPreserve() testUpdateLinksNoPreserve() testRename() testBranch() testBranchWithSpec() testUpdateNoDelete() testWildcardsAllow() testUpdateCmd() testCaseDetect() testRmtop() testJunkNoRemove() testJunkRemove() testRecordMerge() testUpdateZip() testCopyMerge() testCopyMergeFull() cleanup() log("INFO", "---------------------------------------------------") log("INFO", "Summary of results:") goodResultCnt = 0 badResultCnt = 0 for key in sorted(iter(testresults)): log("INFO", "\t{0:20}:\t{1:20}".format(key, testresults[key])) if testresults[key] == 'SUCCESS': goodResultCnt = goodResultCnt + 1 else: badResultCnt = badResultCnt + 1 totalResultCnt = goodResultCnt + badResultCnt log("INFO", "{0} of {1} test passed: {2}%".format(goodResultCnt, totalResultCnt, 100 * goodResultCnt / totalResultCnt))