# -*- encoding: UTF8 -*- from __future__ import print_function import sys import time import P4 import unittest, os, types, shutil, stat from subprocess import Popen, PIPE if sys.version_info[0] >= 3: from configparser import ConfigParser else: from ConfigParser import ConfigParser P4D="p4d" P4USER="testuser" P4CLIENT="test_ws" TRANSFER_CLIENT="transfer" TRANSFER_CONFIG="transfer.cfg" PYTHON="python" PERFORCE_TRANSFER="PerforceTransfer.py" def onRmTreeError( function, path, exc_info ): os.chmod( path, stat.S_IWRITE) os.remove( path ) def ensureDirectory(directory): if not os.path.isdir(directory): os.mkdir(directory) class P4Server: def __init__(self, root): self.root = root self.server_root = os.path.join(root, "server") self.client_root = os.path.join(root, "client") ensureDirectory(self.root) ensureDirectory(self.server_root) ensureDirectory(self.client_root) self.p4d = P4D self.port = "rsh:%s -r \"%s\" -L log -vserver=3 -i" % ( self.p4d, self.server_root ) self.p4 = P4.P4() self.p4.port = self.port self.p4.user = P4USER self.p4.client = P4CLIENT self.p4.connect() self.p4.run_depots() # triggers creation of the user self.client_name = P4CLIENT client = self.p4.fetch_client(self.client_name) client._root = self.client_root self.p4.save_client(client) def shutDown(self): if self.p4.connected(): self.p4.disconnect() def createTransferClient(self, name, root): pass def enableUnicode(self): cmd = [self.p4d, "-r", self.server_root, "-L", "log", "-vserver=3", "-xi"] f = Popen(cmd, stdout=PIPE).stdout for s in f.readlines(): pass f.close() class TestPerforceTransfer(unittest.TestCase): def setUp(self): self.setDirectories() def tearDown(self): self.source.shutDown() self.target.shutDown() time.sleep( 1 ) self.cleanupTestTree() def setDirectories(self): self.startdir = os.getcwd() self.transfer_root = os.path.join(self.startdir, 'transfer') self.cleanupTestTree() ensureDirectory(self.transfer_root) self.source = P4Server(os.path.join(self.transfer_root, 'source')) self.target = P4Server(os.path.join(self.transfer_root, 'target')) self.transfer_client_root = os.path.join(self.transfer_root, 'transfer_client') ensureDirectory(self.transfer_client_root) def cleanupTestTree(self): os.chdir(self.startdir) if os.path.isdir(self.transfer_root): shutil.rmtree(self.transfer_root, False, onRmTreeError) def setupTransfer(self): """Creates client workspaces on source and target and a config file""" source_client = self.source.p4.fetch_client(TRANSFER_CLIENT) source_client._root = self.transfer_client_root source_client._view = ['//depot/inside/... //%s/...' % TRANSFER_CLIENT] self.source.p4.save_client(source_client) target_client = self.target.p4.fetch_client('transfer') target_client._root = self.transfer_client_root target_client._view = ['//depot/import/... //%s/...' % TRANSFER_CLIENT] self.target.p4.save_client(target_client) # create the config file self.parser = ConfigParser() self.parser.add_section('server1') self.parser.set('server1', 'p4port', self.source.port) self.parser.set('server1', 'p4user', P4USER) self.parser.set('server1', 'p4client', TRANSFER_CLIENT) self.parser.set('server1', 'counter', '0') self.parser.add_section('server2') self.parser.set('server2', 'p4port', self.target.port) self.parser.set('server2', 'p4user', P4USER) self.parser.set('server2', 'p4client', TRANSFER_CLIENT) self.parser.set('server2', 'counter', '0') # write the config file self.transfer_cfg = os.path.join(self.transfer_root, TRANSFER_CONFIG) with open(self.transfer_cfg, 'w') as f: self.parser.write(f) def run_PerforceTransfer(self): process = Popen([PYTHON, PERFORCE_TRANSFER, '-c', self.transfer_cfg], stdout=PIPE) out = process.stdout result = out.readlines() out.close() return result def testAdd(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) file1 = os.path.join(inside, "file1") with open(file1, 'w') as f: f.write('Test content') self.source.p4.run_add(file1) self.source.p4.run_submit('-d', 'File1 added') self.run_PerforceTransfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 1, "Target does not have exactly one change") self.assertEqual(changes[0]['change'], "1", "Target change is not 1") files = self.target.p4.run_files('//depot/...') self.assertEqual(len(files), 1, "Target does not have exactly one file") self.assertEqual(files[0]['depotFile'], '//depot/import/file1', "File not transferred to correct place") with open(self.transfer_cfg) as f: self.parser.readfp(f) self.assertEqual(self.parser.getint("server1", 'counter'), 1, "Source counter is not 1") self.assertEqual(self.parser.getint("server1", 'counter'), 1, "Target counter is not 1") def testEditAndDelete(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) file1 = os.path.join(inside, "file1") with open(file1, 'w') as f: f.write('Test content') self.source.p4.run_add(file1) self.source.p4.run_submit('-d', 'File1 added') self.source.p4.run_edit(file1) with open(file1, 'a+') as f: f.write('More content') self.source.p4.run_submit('-d', "File1 edited") self.run_PerforceTransfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 2, "Target does not have exactly two changes") self.assertEqual(changes[0]['change'], "2", "Highest target change is not 2") with open(self.transfer_cfg) as f: self.parser.readfp(f) self.assertEqual(self.parser.getint("server1", 'counter'), 2, "Source counter is not 2") self.assertEqual(self.parser.getint("server1", 'counter'), 2, "Target counter is not 2") self.source.p4.run_delete(file1) self.source.p4.run_submit('-d', 'File1 deleted') self.run_PerforceTransfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 3, "Target does not have exactly three changes") filelog = self.target.p4.run_filelog('//depot/import/file1') self.assertEqual(filelog[0].revisions[0].action, 'delete', "Target has not been deleted") def testSimpleIntegrate(self): self.setupTransfer() # seed the integration inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) file1 = os.path.join(inside, "file1") with open(file1, 'w') as f: f.write('Test content') self.source.p4.run_add(file1) self.source.p4.run_submit('-d', 'File1 added') self.source.p4.run_integrate(file1, os.path.join(inside, "file2")) self.source.p4.run_submit('-d', 'File1 -> File2') result = self.run_PerforceTransfer() # test integration with open(self.transfer_cfg) as f: self.parser.readfp(f) self.assertEqual(self.parser.getint("server1", 'counter'), 2, "Source counter is not 2, but %d" % self.parser.getint("server1", 'counter')) self.assertEqual(self.parser.getint("server1", 'counter'), 2, "Target counter is not 2, but %d" % self.parser.getint("server2", 'counter')) changes = self.target.p4.run_changes() self.assertEqual(len(changes), 2, "Target does not have exactly two changes") self.source.p4.run_edit(file1) with open(file1, 'a+') as f: f.write("More content") self.source.p4.run_submit('-d', 'File1 edited') if __name__ == '__main__': unittest.main()