import os _bzr_plugin_path = os.environ.get('BZR_PLUGIN_PATH') from bzrlib.tests import TestCaseInTempDir from bzrlib.plugins.bzrp4 import p4_fast_export from bzrlib.plugins.bzrp4.tests.p4_for_test import P4ForTest import os.path import re from cStringIO import StringIO import sys class TestP4FastExport(TestCaseInTempDir): # TODO: Eliminate duplicated code that appears here, in test_git_p4 and # in test_dogfood. def __init__(self, method_name): TestCaseInTempDir.__init__(self, method_name) def setUp(self): TestCaseInTempDir.setUp(self) self.p4fortest = P4ForTest() self.p4_compatible_cwd = os.getcwd() self.p4d_workdir = os.path.join(self.p4_compatible_cwd, 'p4d') os.mkdir(self.p4d_workdir) self.p4_client_workdir = os.path.join(self.p4_compatible_cwd, 'p4') os.mkdir(self.p4_client_workdir) self.p4fortest.p4.port = 'rsh:%s -r %s -L log -vserver=3 -i' % ('p4d', self.p4d_workdir) # TODO: Remove parameters, and add them to initialization of # p4fortest? self.p4client_name = 'TestP4FastExport' self.p4fortest.create_client( self.p4client_name, self.p4_client_workdir) self.p4fortest.p4.connect() self.p4fortest.sync() def tearDown(self): self.p4fortest.p4.disconnect() self.p4fortest.destroy_client() TestCaseInTempDir.tearDown(self) def test_p4_fast_export__export_add(self): self.p4fortest.add_file('file_1', 'file_1 contents\n') self.p4fortest.submit('gitify_add change 1') os.environ['P4PORT'] = self.p4fortest.p4.port if _bzr_plugin_path: os.environ['BZR_PLUGIN_PATH'] = _bzr_plugin_path p4_fast_export_stdout = StringIO() orig_stdout = sys.stdout try: sys.stdout = p4_fast_export_stdout p4_fast_export.main(['//depot@all']) finally: sys.stdout = orig_stdout actual_fast_export_data = p4_fast_export_stdout.getvalue() self.assertFastExportRevision( '1', self.p4fortest.get_user(), 'gitify_add change 1', None, 'file_1 contents', actual_fast_export_data) def test_export_two_revisions(self): self.p4fortest.add_file('file_1', 'file_1 contents\n') self.p4fortest.submit('change 1 description') self.p4fortest.add_file('file_2', 'file_2 contents\n') self.p4fortest.submit('change 2 description') os.environ['P4PORT'] = self.p4fortest.p4.port if _bzr_plugin_path: os.environ['BZR_PLUGIN_PATH'] = _bzr_plugin_path p4_fast_export_stdout = StringIO() orig_stdout = sys.stdout try: sys.stdout = p4_fast_export_stdout p4_fast_export.main(['//depot@all']) finally: sys.stdout = orig_stdout pattern = re.compile('(^commit .*\n)', re.MULTILINE) actual_revisions = pattern.split(p4_fast_export_stdout.getvalue()) self.assertFastExportRevision( '1', self.p4fortest.get_user(), 'change 1 description', None, 'file_1 contents', actual_revisions[2]) self.assertFastExportRevision( '2', self.p4fortest.get_user(), 'change 2 description', '1', 'file_2 contents', actual_revisions[4]) def assertFastExportRevision( self, mark, committer, #timestamp, comment, parent_mark, contents, actual_fast_export_data): self.assertUnixLine(self._committer_line(actual_fast_export_data)) self.assertContains(actual_fast_export_data, 'mark :%s' % mark) # TODO: Verify the structure and order in addition to the content. self.assertContains(actual_fast_export_data, committer) # TODO: self.assertContains(actual_fast_export_data, timestamp) self.assertContains(actual_fast_export_data, comment) if (None != parent_mark): self.assertContains( actual_fast_export_data, 'from :%s' % parent_mark) self.assertContains(actual_fast_export_data, contents) def assertUnixLine(self, line): self.assertEqual('\n', line[-1]) try: self.assertNotEqual('\r', line[-2]) except IndexError: pass def _committer_line(self, fast_export_data): """Returns the first committer line in fast_export_data.""" keepends = True for line in fast_export_data.splitlines(keepends): if line.startswith('committer '): return line return '' # TODO: Contribute this back to Bazaar proper, or find and use its # equivalent from Bazaar. I only found assertContainsRe(). def assertContains(self, haystack, needle): if -1 == haystack.find(needle): if '\n' in haystack or len(haystack) > 60: # a long string, format it in a more readable way raise AssertionError( 'pattern "%s" not found in\n"""\n%s"""\n' % (needle, haystack)) else: raise AssertionError('substring "%s" not found in "%s"' % (needle, haystack))