# VSStoP4 - Migrate Visual SourceSafe to Perforce
#
# TEST_vsstop4.PY -- TEST THE scripts
#
# Robert Cowham, Vaccaperna Systems Ltd
#
# 1. INTRODUCTION
#
# This test script tests the vsstop4.
# The main method is by running different configurations and comparing the
# results with a known good result.
import os
import sys
# Add the nearby "code" directory to the module loading path
if os.environ.has_key('VSSTOP4_PATH'):
VSSTOP4_path = os.environ['VSSTOP4_PATH']
else:
VSSTOP4_path = os.path.join(os.getcwd(), '..')
if VSSTOP4_path not in sys.path:
sys.path.append(VSSTOP4_path)
sys.path.append('.')
import copy
import imp
import p4
import popen2
import re
import socket
import string
import time
import types
import unittest
import shutil
import tempfile # http://www.python.org/doc/2.2p1/lib/module-tempfile.html
class TestCase(unittest.TestCase):
def __call__(self, result=None):
if result is None: result = self.defaultTestResult()
self.result = result
unittest.TestCase.__call__(self, result)
def addFailure(self, msg):
try:
raise AssertionError, msg
except AssertionError:
self.result.addFailure(self,self._TestCase__exc_info())
hostname = string.lower(string.split(socket.gethostname(), '.')[0])
config_filename = 'config_' + hostname + '.py'
if not os.path.exists(config_filename):
print "Could not find config file", config_filename
config_file = open(config_filename)
try:
imp.load_source('config', config_filename, config_file)
finally:
config_file.close()
original_configuration = copy.copy(sys.modules['config'].__dict__)
import config
os.environ['PATH'] = os.environ['PATH'] + ";" + config.ss_path
os.environ['SSDIR'] = config.SSDIR
# The default temporary file prefix starts with an '@'. But
# that would mean that temporary files will look like revision
# specifications to Perforce. So use a prefix that's acceptable
# to Perforce.
tempfile.gettempprefix = lambda: '%d.' % os.getpid()
try:
os.remove('..\logfile.log')
except:
pass
# We need to log to a log file. The VSSTOP4 log will get redirected to
# this file, as will the output of various commands.
log_filename = (time.strftime('VSSTOP4.%Y%m%dT%H%M%S.log',
time.gmtime(time.time())))
log_filename = os.path.abspath(log_filename)
log_file = open(log_filename, "a")
def log_message(msg):
date = time.strftime('%Y-%m-%d %H:%M:%S UTC',
time.gmtime(time.time()))
log_file.write("%s %s\n" % (date, msg))
log_file.flush()
sys.stdout.write("VSSTOP4 test suite, logging to %s\n" % log_filename)
sys.stdout.flush()
# Perforce
#
# This class supplies the restart_perforce method
# It also provides "system", which is like os.system,
# but captures output, checks for errors, and writes to the log.
class Perforce:
# Temporary directory for Perforce server and associated files.
p4dir = None
p4server_version = "2006.1" # Default value
# Stop Perforce server
#
# If there are any Perforce servers running on the magic port,
# use p4 admin to stop them.
def stop_perforce(self):
self.system('p4.exe -p "%s" -u "%s" -P "%s" admin stop' %
(config.p4_port,
config.p4_user,
config.p4_password),
ignore_failure = 1)
# Start a new Perforce server
# Make a completely fresh Perforce server, with a new repository.
def start_perforce(self, server_ver = None):
if server_ver:
self.p4server_version = server_ver
# Make a new repository directory.
self.p4dir = tempfile.mktemp()
os.mkdir(self.p4dir)
log_message("Perforce repository directory %s." % self.p4dir)
log_message("Perforce server version %s." % self.p4server_version)
# Copy the license
if config.p4_license_file:
shutil.copyfile(config.p4_license_file,
os.path.join(self.p4dir, 'license'))
# Work out Perforce's port number and start a Perforce server.
match = re.match(".*:([0-9]+)$", config.p4_port)
if match:
port = int(match.group(1))
else:
port = 1999
# p4d on Windows doesn't detach, to we can't use "system"
# to start it.
import win32api
win32api.WinExec("%s -p 0.0.0.0:%d -r %s -L p4d.log" %
(config.p4_server_executable[self.p4server_version], port,
self.p4dir))
time.sleep(2)
# Restart Perforce
#
# By killing the old server and starting a new one.
def restart_perforce(self, server_ver = None):
self.stop_perforce()
self.start_perforce(server_ver)
# Run command and check results
#
# Calls an external program, raising an exception upon any error
# and returning standard output and standard error from the program,
# as well as writing them to the log.
def system(self, command, ignore_failure = 0, input_lines = None):
log_file.write('Executing %s\n' % repr(command))
(child_stdin, child_stdout) = os.popen4(command)
if input_lines:
child_stdin.writelines(input_lines)
child_stdin.close()
output = child_stdout.read()
result = child_stdout.close()
log_file.write(output)
if not ignore_failure and result:
message = ('Command "%s" failed with result code %d.' %
(command, result))
log_file.write(message + '\n')
raise message
return output
# Create a new param file
def write_param_file(self):
ini_filename = (time.strftime('config.ini.%Y%m%dT%H%M%S', time.gmtime(time.time())))
os.environ['VSSTOP4_CONFIG'] = ini_filename
param_file = open("..\%s" % ini_filename, "w")
param_file.write("# This is a test version of config file\n# This is not the real config.ini!!\n\n")
for k in config.convert_params.keys():
param_file.write("%s: %s\n" % (k, config.convert_params[k]))
param_file.close()
# VSSTOP4 TEST CASE BASE CLASS
#
# The VSSTOP4_base class is a generic VSSTOP4 test case. It defines methods
# for setting up the integration.
#
# Other VSSTOP4 test cases will inherit from VSSTOP4_base.
#
# When a class implements several test cases, the methods that implement
# test cases (in the PyUnit sense) should have names starting "test_".
# When a class implements a single test case, the method should be
# called "runTest".
class VSSTOP4_base(TestCase):
# Set up everything so that test cases can run
#
# Get a fresh configuration, a new defect tracker and a new Perforce
# server.
p4d = Perforce()
def setup_everything(self, server_ver = None, config_changes = {}):
for key, value in config_changes.items():
setattr(config, key, value)
self.p4d.restart_perforce(server_ver)
self.p4d.write_param_file()
self.config = config
# Get a temporary Perforce interface suitable for setting the
# Perforce password.
if config.p4_password:
p4i = p4.P4()
p4i.port = config.p4_port
p4i.user = config.p4_user
p4i.connect()
# Get a permanent Perforce interface using the password.
self.p4 = p4.P4()
self.p4.port = config.p4_port
self.p4.user = config.p4_user
self.p4.connect()
def write_to_file(self, fname, results):
temp_file = open(fname, "w")
for l in results:
temp_file.write(l)
temp_file.write("\n")
temp_file.close()
def compare_results(self, good_file, results):
temp_fname = "_" + good_file
self.write_to_file(temp_fname, results)
result = self.p4d.system("fc %s %s" % (good_file, temp_fname))
if not re.compile("FC: no differences encountered").search(result):
self.fail("Expected no differences comparing %s and %s." % (good_file, temp_fname))
def check_result(self, file, command):
good_file = file + ".txt"
results = self.p4.run(string.split(command, ' '))
self.compare_results(good_file, results)
def check_labels(self):
p4i = p4.P4()
p4i.port = self.config.p4_port
p4i.user = self.config.p4_user
p4i.tagged()
p4i.connect()
labels = p4i.run('labels')
results = []
for l in labels:
results.append('Label: ' + l['label'] + '\n')
results.append(l['Description'] + '\n')
contents = p4i.run(string.split('files //...@' + l['label'], ' '))
results.append('Files:\n')
for f in contents:
results.append(f['depotFile'] + '#' + f['rev'])
self.compare_results("labels.txt", results)
def check_consistency(self):
self.check_result('changes', 'changes -l')
self.check_result('verify', 'verify //...')
self.check_labels()
def run_converter(self):
self.p4d.system("cd .. && perl convert.pl")
# NORMAL OPERATION
class normal(VSSTOP4_base):
def setUp(self):
self.setup_everything()
def runTest(self):
"Check output of p4 verify"
self.run_converter()
self.check_consistency()
# Server Compatibility - same as normal test just uses different server versions
class server_versions(VSSTOP4_base):
def setUp(self):
pass
def runTest(self):
"Test all server version (except current one)"
for ver in ["2005.2", "2005.1", "2004.2", "2003.2"]:
# for ver in ["2005.2", "2005.1"]:
self.setup_everything(ver)
self.run_converter()
self.check_consistency()
# TEST CASE: Use Ole Automation
class ole_auto(VSSTOP4_base):
def setUp(self):
config.convert_params['vss_use_ole'] = 'yes'
self.setup_everything()
def runTest(self):
"Use OLE Automation"
self.run_converter()
self.check_consistency()
# TEST CASE: Use vsscorrupt
class vss_corrupt(VSSTOP4_base):
def setUp(self):
config.convert_params['vsscorrupt'] = 'yes'
self.setup_everything()
def runTest(self):
"Use VSS Corrupt"
self.run_converter()
self.check_consistency()
# RUNNING THE TESTS
def tests():
suite = unittest.TestSuite()
# Decide which tests to run
tests = [normal, ole_auto, vss_corrupt, server_versions]
# tests = [normal]
for t in tests:
suite.addTest(t())
return suite
if __name__ == "__main__":
unittest.main(defaultTest="tests")