# Perforce Defect Tracking Integration Project
# <http://www.ravenbrook.com/project/p4dti/>
#
# TEST_P4DTI.PY -- TEST THE P4DTI
#
# Gareth Rees, Ravenbrook Limited, 2001-03-14
#
#
# 1. INTRODUCTION
#
# This test script tests the P4DTI. The initial plan is described in
# [GDR 2000-12-31] and some of the design is given in [GDR 2001-03-14].
#
# This script contains a lot of tricky code (for example, using the
# hostname or operating system name to pick a module to import or a
# class to use; or overriding a method in another module in order to
# snoop on its behaviour). Please take care when editing it.
#
# It uses the PyUnit unit test framework [PyUnit].
#
# The intended readership is project developers.
#
# This document is not confidential.
#
# The test cases depend on features of the test databases. It is
# important that the cases be kept in sync with the test database and
# with the design of those databases [GDR 2001-03-14]. When you add a
# test, be sure to change the design and refer back to the test.
#
#
# 1.1. Useful info
#
# The script creates a series of test Perforce servers (one for each
# test case). When a test case starts, it kills all the running
# Perforce servers on the required port. Each of these has its own
# directory, in the appropriate place for temporary files for your
# system (typically /tmp on Unix, C:\Temp on Windows). The P4DTI log is
# diverted from its real place and sent to the file p4dti.log.
#
#
# 1.2. Regression tests in this script
#
# The section * means that the defect is tested throughout as a simple
# consequence of running the script; there is no particular test for it.
#
# Job Section Title
# ----------------------------------------------------------------------
import os
import sys
# Add the nearby "code" directory to the module loading path
# to get the P4DTI modules. If P4DTI_PATH is set then add
# that instead, so that other versions of P4DTI can be tested.
if os.environ.has_key('P4DTI_PATH'):
p4dti_path = os.environ['P4DTI_PATH']
else:
p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code',
'replicator')
if p4dti_path not in sys.path:
sys.path.insert(0, p4dti_path)
import copy
import sgmllib
import imp
import logger
import message
import p4i
import p4dti_unittest
import popen2
import re
import socket
import string
import time
import types
import unittest
import urllib
import random
import shutil
import tempfile # http://www.python.org/doc/2.2p1/lib/module-tempfile.html
import inspect
# The default temporary file prefix starts with an '@'. But
# that would mean that temporary files will look like revision
# specifications to Perforce. So use a prefix that's acceptable
# to Perforce.
# [Why does that matter? RB 2002-10-29]
tempfile.gettempprefix = lambda: '%d.' % os.getpid()
# 2. CONFIGURATION
#
#
# 2.1. Limitations
#
# This script can only support one basic configuration on each host (see
# section 2.2). Tests of other configurations have to be made by
# changing configuration parameters in the script.
# section 2.2). Tests of other configurations have to be made by
# changing configuration parameters in the script.
#
# This script can only test one defect tracker on each host (you have
# to change dt_name in config_HOSTNAME.py to test another defect
# tracker).
#
# This script can only have one current invocation on each host.
#
# Could it use an environment variable to work out which configuration
# to use?
#
#
# 2.2. Using a test configuration
#
# I want to be able to test different configurations, changes to
# configurations, and to check whether erroneous configurations are
# spotted correctly.
#
# I expect to find a working configuration either (1) in the file
# specified by the environment variable P4DTI_CONFIG, if set, or (2) in
# config_HOSTNAME.py, where HOSTNAME is the first component of the FQDN,
# converted to lower case (e.g. 'swan' for 'swan.ravenbrook.com').
#
# This configuration file is an ordinary P4DTI configuration file,
# except that it must include two additional configuration parameters:
# p4_license_file specifies the location of a Perforce license file
# suitable for use on the machine running the tests;
# p4_server_executable specifies the location of a suitable Perforce
# server executable.
#
# The loaded configuration module is copied when a test is started and
# the copy is installed in sys.modules['config']. This has two effects:
#
# 1. The P4DTI itself won't load the config module; see the
# initialization code [RB 2000-12-08].
#
# 2. The unit tests can make changes to the copy of the configuration
# (e.g., to specify incorrect values, as in section 6) without
# affecting the original, which can be restored for the next test.
if os.environ.has_key('P4DTI_CONFIG'):
config_filename = os.environ['P4DTI_CONFIG']
else:
hostname = string.lower(string.split(socket.gethostname(), '.')[0])
config_filename = 'config_' + hostname + '.py'
if not os.path.exists(config_filename):
print "Could not find config file", config_filename
print "Either create one, or set P4DTI_CONFIG to the name of one."
config_file = open(config_filename)
try:
imp.load_source('config', config_filename, config_file)
finally:
config_file.close()
original_configuration = copy.copy(sys.modules['config'].__dict__)
import config
# 2.3. Logging
#
# We need to log to a log file. The P4DTI log will get redirected to
# this file, as will the output of various commands.
log_filename = (config.log_file
or time.strftime('p4dti.%Y%m%dT%H%M%S.log',
time.gmtime(time.time())))
log_filename = os.path.abspath(log_filename)
log_file = open(log_filename, "a")
def log_message(msg):
date = time.strftime('%Y-%m-%d %H:%M:%S UTC',
time.gmtime(time.time()))
log_file.write("%s %s\n" % (date, msg))
log_file.flush()
def log_fn():
# Logs the immediately calling function from the stack
log_message("In function: %s" % inspect.stack()[1][3])
sys.stdout.write("P4DTI test suite, logging to %s\n" % log_filename)
sys.stdout.flush()
if config.dt_name == 'Tracker':
email_suffix = "@vaccaperna.co.uk"
else:
email_suffix = "@ravenbrook.com"
def reset_configuration():
# Delete current config, just in case we added something.
for k in sys.modules['config'].__dict__.keys():
if not re.match('^__.*__$', k):
del sys.modules['config'].__dict__[k]
# Restore old config.
for k, v in original_configuration.items():
sys.modules['config'].__dict__[k] = v
class p4dti_html_parser(sgmllib.SGMLParser):
def attrs(self, attrs_list):
attrs = { }
for a, v in attrs_list:
attrs[a] = v
return attrs
# 3. DEFECT TRACKER AND OPERATING SYSTEM INTERFACES
#
# Many of the test cases are generic: they don't depend on a particular
# defect tracker or operating system. But they need an interface to the
# defect tracker in order to restart it, and to the operating system in
# order to start a new Perforce server.
#
# Each interface class should be called DT_OS_interface (where DT is
# config.dt_name and OS is os.name) is and must define these methods:
#
# restart_defect_tracker()
# restart_perforce()
#
# Note that there are no corresponding "stop" methods. The tests leave
# Perforce and the defect tracker running so that failures can be
# investigated without all the evidence having disappeared.
# 3.1. Perforce mixin
#
# This class supplies the restart_perforce method for use by
# defect tracker interfaces. It is suitable for use on both Posix
# and Windows. It also provides "system", which is like os.system,
# but captures output, checks for errors, and writes to the log.
class Perforce_mixin:
# Temporary directory for Perforce server and associated files.
p4dir = None
# 3.1.1. Stop Perforce server
#
# If there are any Perforce servers running on the magic port,
# use p4 admin to stop them.
def stop_perforce(self):
self.system('p4.exe -p "%s" -u "%s" -P "%s" admin stop' %
(config.p4_port,
config.p4_user,
config.p4_password),
ignore_failure = 1)
# 3.1.2. Start a new Perforce server
#
# Make a completely fresh Perforce server, with a new repository.
def start_perforce(self):
# Make a new repository directory.
self.p4dir = tempfile.mktemp()
os.mkdir(self.p4dir)
log_message("Perforce repository directory %s." % self.p4dir)
# Copy the license
if config.p4_license_file:
shutil.copyfile(config.p4_license_file,
os.path.join(self.p4dir, 'license'))
# Work out Perforce's port number and start a Perforce server.
match = re.match(".*:([0-9]+)$", config.p4_port)
if match:
port = int(match.group(1))
else:
port = 1666
if os.name == 'nt':
# p4d on Windows doesn't detach, to we can't use "system"
# to start it. RB 2002-10-28
import win32api
win32api.WinExec("%s -p 0.0.0.0:%d -r %s" %
(config.p4_server_executable, port,
self.p4dir))
time.sleep(2)
else:
# For some reason, p4d doesn't detach properly when called
# by os.popen4, so we have to use os.system here instead
# of self.system. RB 2002-10-29
os.system('%s -d -p 0.0.0.0:%d -r "%s" >> %s 2>&1' %
(config.p4_server_executable,
port,
self.p4dir,
log_filename))
# 3.1.3. Restart Perforce
#
# By killing the old server and starting a new one.
def restart_perforce(self):
self.stop_perforce()
self.start_perforce()
# 3.1.4. Run command and check results
#
# Calls an external program, raising an exception upon any error
# and returning standard output and standard error from the program,
# as well as writing them to the log.
def system(self, command, ignore_failure = 0, input_lines = None):
log_file.write('Executing %s\n' % repr(command))
(child_stdin, child_stdout) = os.popen4(command)
if input_lines:
child_stdin.writelines(input_lines)
child_stdin.close()
output = child_stdout.read()
result = child_stdout.close()
log_file.write(output)
if not ignore_failure and result:
message = ('Command "%s" failed with result code %d.' %
(command, result))
log_file.write(message + '\n')
raise message
return output
# 3.2. Interface to TeamTrack on Windows NT: removed 2003-05-23.
# 3.3. Bugzilla interface mixin
#
# This is the interface to Bugzilla, suitable for use on both
# Posix and Windows.
#
# This class installs and initializes a complete new Bugzilla
# every time.
#
# The command lines used are carefully
# designed to work with both /bin/sh and CMD.EXE, so be careful
# when you edit it. As far as possible, Python's shutil tools
# are used for cross-platform work.
class Bugzilla_mixin:
# The Bugzilla server.
server = None
# 3.3.1. Install and configure Bugzilla
#
# install_bugzilla() creates a new Bugzilla installation by copying
# the Bugzilla source code from its location relative to the test
# suite into a temporary directory and then carrying out the initial
# configuration steps that the Bugzilla administrator would carry
# out; see [Barnson 2001-08-29, 3.2.14-15].
def install_bugzilla(self):
# Make a temporary directory.
config.bugzilla_directory = tempfile.mktemp()
log_message("Bugzilla directory %s." % config.bugzilla_directory)
# Create Bugzilla database.
self.empty_bugzilla_database()
# Copy Bugzilla sources.
bz_path = os.path.abspath(os.path.join(
os.getcwd(), os.pardir, 'code',
'bugzilla-%s' % config.bugzilla_version))
shutil.copytree(bz_path, config.bugzilla_directory)
cwd = os.getcwd()
try:
os.chdir(config.bugzilla_directory)
if os.name == 'nt':
# Patch Bugzilla to make it suitable for running on
# windows, using a pre-cooked patch derived from a
# working Bugzilla on Windows installation.
patch_path = os.path.abspath(os.path.join(cwd,
'bugzilla-%s-win32-patch' % config.bugzilla_version))
self.system("patch < %s" % patch_path)
# Copy processmail to processmail.pl, as this is expected
# by the patched Bugzilla.
os.rename("processmail", "processmail.pl")
# Run checksetup.pl for the first time (no input required).
self.system('perl checksetup.pl')
# Edit the newly-created localconfig so that db_name is
# correct. Also, on Windows, edit the webservergroup
# for some mysterious reason not explained in the Bugzilla
# documentation.
f = open('localconfig', 'r')
localconfig = f.read()
f.close()
localconfig = re.sub('\\$db_name = "bugs"',
'$db_name = "%s"' % config.dbms_database,
localconfig)
if os.name == 'nt':
localconfig = (localconfig + "\n" +
'$webservergroup = "8";\n')
f = open('localconfig', 'w')
f.write(localconfig)
f.close()
# Run checksetup.pl for the second time. This time Bugzilla
# prompts us to enter some configuration parameters. We
# supply values from the test configuration, as follows.
password = config.bugzilla_admin_password + '\n'
replies = [
# Enter the e-mail address of the administrator
config.bugzilla_admin_user + '\n',
# You entered ... Is this correct? [Y/n]
'\n',
# Enter the real name of the administrator
'Bugzilla administrator\n',
# Enter a password for the administrator account
password,
# Please retype the password to verify
password,
]
# The checksetup.pl script attempts to disable and enable
# input echoing (when receiving a password) by calling stty.
# These stty calls fail harmlessly here because
# checksetup.pl's input is not a terminal.
self.system('perl checksetup.pl', input_lines = replies)
finally:
os.chdir(cwd)
# 3.3.2. Empty the Bugzilla database
#
# Drops (deletes) and recreates the Bugzilla test database on
# the MySQL server.
def empty_bugzilla_database(self):
db = config.dbms_database
user = config.dbms_user
self.system('mysqladmin -u "%s" --force drop "%s"' % (user, db),
ignore_failure = 1)
self.system('mysqladmin -u "%s" create "%s"' % (user, db))
# 3.3.3. Restart Bugzilla
#
# Install Bugzilla. If the configuration parameter
# bugzilla_mysqldump is not None, then drop the existing Bugzilla
# database and replace it with one from the MySQL dump file.
def restart_defect_tracker(self):
self.install_bugzilla()
if config.bugzilla_mysqldump == None:
return
self.empty_bugzilla_database()
self.system('mysql -u "%s" "%s" < "%s"' %
(config.dbms_user,
config.dbms_database,
config.bugzilla_mysqldump))
# 3.3.4. Run a Bugzilla CGI script
#
# run_script(script, params) runs a Bugzilla CGI script as if
# invoked by a web server as a result of an HTTP request. It
# returns a string containing the output of the script. The script
# argument is the name of the script (relative to
# config.bugzilla_directory) and the params argument is a dictionary
# mapping form parameter name to value.
def run_script(self, script, params):
data = urllib.urlencode(params)
env_additions = {
'REQUEST_METHOD': 'POST',
'REMOTE_ADDR': '127.0.0.1',
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'CONTENT_LENGTH': str(len(data)),
}
cwd = os.getcwd()
try:
for k, v in env_additions.items():
os.environ[k] = v
os.chdir(config.bugzilla_directory)
command = 'perl "%s"' % script
log_file.write('Executing Bugzilla script %s\n' % repr(command))
child_out, child_in = popen2.popen2(command)
child_in.write(data)
log_file.write(' ... with input %s ...' % data)
child_in.close()
result = child_out.read()
log_file.write(' ... and output %s\n' % result)
return result
finally:
for e in env_additions.keys():
del os.environ[e]
os.chdir(cwd)
# 3.3.5. Get Bugzilla server parameters
#
# Returns a dictionary mapping Bugzilla parameter name to value.
#
# It works by running the "editparams.cgi" CGI script and parsing
# the output.
class editparams_parser(p4dti_html_parser):
def __init__(self):
self.params = { }
self.collecting_params = 0
self.textarea_name = None
self.textarea_contents = None
sgmllib.SGMLParser.__init__(self)
def attrs(self, attrs_list):
attrs = { }
for a, v in attrs_list:
attrs[a] = v
return attrs
def start_form(self, attrs_list):
attrs = self.attrs(attrs_list)
if attrs.get('action') == 'doeditparams.cgi':
self.collecting_params = 1
def end_form(self):
self.collecting_params = 0
def start_input(self, attrs_list):
if not self.collecting_params:
return
attrs = self.attrs(attrs_list)
if attrs.get('type') == 'radio' and attrs.get('checked'):
self.params[attrs['name']] = attrs['value']
elif attrs.get('type', 'text') == 'text':
value = re.sub('&', '&', attrs['value'])
self.params[attrs['name']] = value
def start_textarea(self, attrs_list):
if not self.collecting_params:
return
attrs = self.attrs(attrs_list)
self.textarea_name = attrs['name']
self.textarea_contents = []
def end_textarea(self):
if not self.collecting_params:
return
value = string.join(self.textarea_contents, '')
self.params[self.textarea_name] = value
self.textarea_name = None
self.textarea_contents = None
def handle_data(self, data):
if self.collecting_params and self.textarea_name:
self.textarea_contents.append(data)
def server_parameters(self):
params = {
'Bugzilla_login': config.bugzilla_admin_user,
'Bugzilla_password': config.bugzilla_admin_password,
}
result = self.run_script('editparams.cgi', params)
parser = self.editparams_parser()
parser.feed(result)
parser.close()
return parser.params
def edit_parameters(self, new_params):
params = self.server_parameters()
params.update({
'Bugzilla_login': config.bugzilla_admin_user,
'Bugzilla_password': config.bugzilla_admin_password,
})
params.update(new_params)
self.run_script('doeditparams.cgi', params)
# It takes a moment for the new table to become available to
# other MySQL connections, so wait a bit.
time.sleep(4)
# 3.2.8. Run command and check results
#
# Calls an external program, raising an exception upon any error
# and returning standard output and standard error from the program,
# as well as writing them to the log.
def system(self, command, ignore_failure = 0, input_lines = None):
log_file.write('Executing %s\n' % repr(command))
(child_stdin, child_stdout) = os.popen4(command)
if input_lines:
child_stdin.writelines(input_lines)
child_stdin.close()
output = child_stdout.read()
result = child_stdout.close()
log_file.write(output)
if not ignore_failure and result:
message = ('Command "%s" failed with result code %d.' %
(command, result))
log_file.write(message + '\n')
raise message
return output
class Bugzilla_nt_interface(Bugzilla_mixin,
Perforce_mixin):
pass
class Bugzilla_posix_interface(Bugzilla_mixin,
Perforce_mixin):
pass
# 3.3.A Tracker interface mixin
#
# This is the interface to Tracker, suitable for use on
# Windows only.
#
# This class resets a Tracker database
#
class Tracker_mixin:
# The Tracker server.
trk = None
log_messages = []
def log(self, msg):
log_message(msg)
self.log_messages.append(msg)
# 3.3.1. Install and configure Tracker
#
def install_tracker(self):
import tracker
config.logger = self
trk = tracker.tracker(config,
config.tracker_user,
config.tracker_password,
config.tracker_project,
config.tracker_server)
config.trk = trk
import win32api
win32api.WriteProfileVal('Tracker', 'Marker', str(0), trk.ini_file())
def empty_tracker_database(self):
# Need to setup this as it is used in the call below
config.tr_fields = {
'Id': ( 'int' ),
}
config.trk.delete_issues_before(99999999)
# Restart Tracker
def restart_defect_tracker(self):
self.install_tracker()
self.empty_tracker_database()
class Tracker_nt_interface(Tracker_mixin,
Perforce_mixin):
pass
# 4. P4DTI TEST CASE BASE CLASS
#
# The p4dti_base class is a generic P4DTI test case. It defines methods
# for setting up the integration and some utilities for recording and
# interrogating the output of the replicator.
#
# Other P4DTI test cases will inherit from p4dti_base.
#
# When a class implements several test cases, the methods that implement
# test cases (in the PyUnit sense) should have names starting "test_".
# When a class implements a single test case, the method should be
# called "runTest".
class p4dti_base(p4dti_unittest.TestCase):
# Defect tracker interface (an instance of one of the classes in
# section 3 above).
dti = eval(config.dt_name + '_' + os.name + '_interface')()
# Messages written to the replicator's log.
log_messages = []
# Mail messages sent by the replicator. Each message is a triple
# (recipients, subject, body); see the definition of mail() method
# in the replicator class.
mail_messages = []
# The replicator.
r = None
# 4.1. Snoop on the logger
#
# I want to be able to test that the correct messages are appearing
# in the log, so I override the log method in the
# logger.multi_logger class so that it records the messages as well
# as printing them to the log file. Printing to a log file is
# necessary so that the test engineer can analyze what happened when
# a test case fails.
#
# A disadvantage of this implementation is that the logging code is
# not tested. Therefore there needs to be a separate set of logging
# tests.
def snoop_logger(self):
logger.multi_logger.log = self.log
logger.file_logger.log = self.log
logger.sys_logger.log = self.log
def log(self, msg):
log_message(msg)
self.log_messages.append(msg)
# The clear_log method can be used to clear this record of the log
# before carrying out a test.
def clear_log(self):
self.log_messages = []
self.mail_messages = []
# 4.2. Snoop on e-mail
def snoop_mail(self):
sys.modules['smtplib'] = self
def log_separator(self):
log_file.write("-" * 72)
log_file.write("\n")
def SMTP(self, server):
self.log_separator()
return self
def quit(self):
self.log_separator()
def sendmail(self, from_address, to, text):
self.mail_messages.append((to, text))
log_file.write(text)
# 4.3. Check that the log is as expected
#
# These methods make various checks on the messages recorded in the
# log.
def log_message_ids(self):
return map(lambda m: m.id, self.log_messages)
def expected_only(self, expected):
for m in self.log_messages:
assert m.id in expected, \
("Unexpected message %d (%s) found in log %s"
% (m.id, str(m), self.log_message_ids()))
def expected_only_in_range(self, min, max, expected):
for m in self.log_messages:
if m.id >= min and m.id <= max:
assert m.id in expected, \
("Unexpected message %d (%s) found in log %s"
% (m.id, str(m), self.log_message_ids()))
def expected_not(self, unexpected):
for m in self.log_messages:
assert m.id not in unexpected, \
("Unexpected message %d (%s) found in log %s"
% (m.id, str(m), self.log_message_ids()))
def expected(self, expected):
found = {}
for m in self.log_messages:
if found.has_key(m.id):
found[m.id] = found[m.id] + 1
else:
found[m.id] = 1
for id in expected:
assert found.has_key(id) and found[id] > 0, \
("Expected message %d not found in log %s"
% (id, self.log_message_ids()))
found[id] = found[id] - 1
def expectation(self, expected, maybe=[]):
self.expected(expected)
self.expected_only_in_range(800, 1000, expected + maybe)
# 4.4. Set up everything so that test cases can run
#
# Get a fresh configuration, a new defect tracker and a new Perforce
# server.
def setup_everything(self, config_changes = {}):
reset_configuration()
for key, value in config_changes.items():
setattr(config, key, value)
self.dti.restart_defect_tracker()
self.dti.restart_perforce()
self.snoop_logger()
self.snoop_mail()
# Get a temporary Perforce interface suitable for setting the
# Perforce password.
if config.p4_password:
p4p = p4i.p4(port = config.p4_port,
user = config.p4_user,
client_executable = config.p4_client_executable,
logger = self )
p4p.run('user -o')
# Problem with quoting strings in arguments to p4 on
# Windows using os.popen (as p4i.run does). NB 2002-10-28.
if os.name == 'nt':
if ' ' in config.p4_password:
raise "space in p4_password on Windows"
p4p.run('passwd -P %s' % config.p4_password)
else:
p4p.run('passwd -O "" -P "%s"' % config.p4_password)
# Get a permanent Perforce interface using the password.
self.p4 = p4i.p4(port = config.p4_port,
user = config.p4_user,
password = config.p4_password,
client_executable = config.p4_client_executable,
logger = self )
# Set the user's email address to the replicator_address
user = self.p4.run('user -o')[0]
user['Email'] = config.replicator_address
self.p4.run('user -i', user)
# Create a spec depot
depot = self.p4.run('depot -o spec')[0]
depot['Type'] = 'spec'
self.p4.run('depot -i', depot)
# 4.5. Initialize the replicator
#
# Load the init module. If it's already loaded, reload it: we must
# reload it because we want the init module to run again and to pick
# up on the new configuration. (Some other modules have the same
# properties.)
def initialize_replicator(self):
log_fn()
self.mail_messages = []
for m in ['init']:
if sys.modules.has_key(m):
del sys.modules[m]
import init
self.r = init.r
# Regression test for job000199.
assert self.mail_messages == []
# 4.6. Normal replicator startup
#
# This is a pseudo test case. It checks that the replicator can
# start up normally. Other tests may depend on this having been
# run; for example, the check_nothing test (5.1). This test must be
# run before any other test cases, and therefore can't be part of a
# test suite (the tests of which may be run singly or in any order).
# It should therefore be run from the setUp() method of a subclass
# of this class (unless of course that subclass has a different idea
# of what should happen at startup).
def check_startup(self):
log_fn()
self.initialize_replicator()
# Expect to get a startup e-mail
self.clear_log()
self.r.prepare_to_run()
self.expectation([800, 866], [867, 870, 910])
# Expect to set up issues and replicate them, but no jobs or
# conflicts.
self.poll()
if config.dt_name == 'Tracker':
# We have cleared DB so don't expect any issues
self.expected_only_in_range(800, 1000, [911, 912])
else:
self.expectation([803, 804, 812, 911, 912])
# There should be no further activity if we replicate again.
self.poll()
self.expected_only_in_range(800, 1000, [911, 912])
# 4.7. Exceptional replicator behaviour
#
# This method calls the given function but expects to get the
# exception given by 'error', with message id 'msgid'.
def check_exception(self, function, error, msgid):
try:
function()
except:
err, msg = sys.exc_info()[0:2]
assert err == error, \
("Expected error %s but got %s." % (error, err))
assert msg.id == msgid, \
("Expected message %d but got %d (%s)"
% (msgid, msg.id, str(msg)))
else:
self.fail("Expected error %s but didn't get it." % error)
# check_startup_error() is a special case of check_exception for the
# initialize_replicator method.
def check_startup_error(self, error, msgid):
self.check_exception(self.initialize_replicator, error, msgid)
# 4.8. Check consistency
#
# This method checks that the databases are consistent.
def check_consistency(self):
log_fn()
self.clear_log()
self.r.check_consistency()
self.expectation([871, 885], [883, 884, 890])
# 4.9. Check replication of a single issue
def check_replication_dt_to_p4(self, first_time = 0):
log_fn()
self.poll()
if first_time:
self.expected([803]) # set up for replication
self.expectation([804, 812, 911, 912], [803, 813])
# Nothing should come back from Perforce.
self.poll()
if config.dt_name == 'Tracker':
# We expect a dummy replicate with no fields done
self.expected_only_in_range(800, 1000, [804, 813, 911, 912])
else:
self.expected_only_in_range(800, 1000, [911, 912])
# 4.10. Initialize Perforce repository
#
# This method sets up Perforce clients and workspaces for a set of
# users, and adds a file to the repository. It sets up the
# following members:
#
# p4i: Map from user to a Perforce interface for that user.
# workspace: Map from user to the client workspace for that user.
#
# This is only needed for tests involving Perforce fixes: that's why
# it's not called in setup_everything().
def setup_perforce(self, users):
log_fn()
# Create Perforce interfaces and workspaces for the dummy users.
self.p4p = {}
self.workspace = {}
for user in users:
self.workspace[user] = os.path.join(self.dti.p4dir, user)
os.mkdir(self.workspace[user])
os.mkdir(os.path.join(self.workspace[user], 'depot'))
self.p4p[user] = p4i.p4(
port = config.p4_port,
user = user,
client = user + '_' + socket.gethostname(),
client_executable = config.p4_client_executable,
logger = self, )
# Make the Perforce user record.
p4_user = self.p4p[user].run('user -o')[0]
p4_user['Email'] = ('%s%s' % (user,email_suffix))
self.p4p[user].run('user -i', p4_user)
# Make a Perforce client.
client = self.p4p[user].run('client -o')[0]
client['Root'] = self.workspace[user]
self.p4p[user].run('client -i', client)
# Add a file to the repository so that we have something with
# which to make changes and fixes.
user = users[0]
filename = os.path.join(self.workspace[user], 'depot', 'test')
open(filename, 'w')
self.p4p[user].run('add -t ktext %s' % filename)
change = self.p4p[user].run('change -o')[0]
change['Description'] = 'Added test file'
self.p4p[user].run('submit -i', change)
# 4.11. Make changelist in Perforce
#
# This method makes a changelist in Perforce and edits a file in
# that changelist. The changelist is returned without being
# submitted.
def edit_file(self, user):
log_fn()
change = self.p4p[user].run('change -o')[0]
change['Description'] = 'Edited test file'
result = self.p4p[user].run('change -i', change)[0]
changelist = string.split(result)[1]
filename = os.path.join(self.workspace[user], 'depot', 'test')
# Perforce reports "files up to date" as an error, so ignore it.
try:
self.p4p[user].run('sync %s' % filename)
except p4.error:
pass
self.p4p[user].run('edit -c %s %s'
% (changelist, filename))
f = open(filename, 'a')
f.write("Foo\n")
f.close()
return self.p4p[user].run('change -o %s' % changelist)[0]
# 4.12 Variant tests
#
# This class has variant methods for each defect tracker (e.g.,
# Bugzilla) and calls the appropriate one.
def run_variant(self):
try:
test = getattr(self, config.dt_name)
except AttributeError:
assert 0, "No test variant for " + config.dt_name + "."
test()
# 4.13. Poll the databases, expecting no errors
expected_notices = [607]
def poll(self):
log_fn()
self.clear_log()
self.r.carefully_poll_databases()
for m in self.log_messages:
if (isinstance(m, message.message)
and m.id not in self.expected_notices):
assert m.priority >= message.INFO, \
("Expected no errors, but message '%s' has "
"priority %d." % (str(m), m.priority))
# 4.14. Create a job
#
# Fill in fields. Return the name of the job.
def create_job(self, p4p, job):
for k, v in job.items():
if string.find(v, '<enter description here>') == 0:
job[k] = 'foo'
result = p4p.run('job -i', job)
return string.split(result[0])[1]
# 5. TEST CASES: NORMAL OPERATION
#
# If nothing has changed, then nothing happens when the replicator
# polls. The databases are consistent.
class normal(p4dti_base):
def setUp(self):
self.setup_everything()
self.check_startup()
def runTest(self):
"Startup, replication to Perforce (test_p4dti.normal)"
self.check_consistency()
# 6. TEST CASES: INCORRECT CONFIGURATIONS
#
# This is a regression test of job000037, job000075 and job000116.
class bogus(p4dti_base):
def setUp(self):
self.setup_everything()
# 6.1. An incorrect parameter generates an error
#
# This is a utility function for carrying out a range of tests. It
# resets the configuration, sets the parameter named by 'param' to
# value, then tries to start the replicator. It expects to get an
# exception, whose message should have the message id 'msgid'.
def check_param(self, param, value, msgid):
reset_configuration()
setattr(config, param, value)
try:
self.initialize_replicator()
except:
err, msg = sys.exc_info()[0:2]
if isinstance(msg, message.message):
if msg.id != msgid:
self.addFailure("Set parameter %s to '%s': "
"expected message %d but got %d "
"(%s)" % (param, value, msgid,
msg.id, str(msg)))
else:
self.addFailure("Set parameter %s to '%s': expected "
"message %d but got '%s: %s' instead."
% (param, value, msgid, err, msg))
else:
self.addFailure("Set parameter %s to '%s': expected "
"message %d but didn't get it."
% (param, value, msgid))
# 6.2. Basic errors in parameters are caught quickly
#
# Basic errors in parameters (wrong type, wrong format) should be
# caught quickly.
#
# This is a table of (parameter name, bogus value, message id of
# expected error).
bogus_basic_parameters = [
# Regression for job000170:
('administrator_address', 'invalid e-mail address', 202),
('changelist_url', -1, 207),
('changelist_url', "http://invalid/%d/%s", 210),
('changelist_url', "http://invalid/no/format/specifier", 210),
('changelist_url', "http://invalid/%d/%%/%%%", 210),
('closed_state', -1, 208),
('configure_name', -1, 207),
('job_url', 42, 207),
('job_url', "http://invalid/%d/%s", 211),
('job_url', "http://invalid/no/format/specifier", 211),
('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211),
('log_file', -1, 208),
('log_level', 'not an int', 204),
('migrate_p', 'not a function', 203),
('p4_client_executable', -1, 207),
# Regression test for job000158:
('p4_client_executable', 'no such file', 705),
('p4_user', None, 207),
('p4_password', -1, 207),
# ('p4_password', 'incorrectpassword', 706),
('p4_port', None, 207),
# Regression test for job000158, job000202:
('p4_port', '127.0.0.1:9999', 707),
('p4_server_description', -1, 207),
('poll_period', 'not an int', 204),
('prepare_issue', 'not a function', 203),
('replicator_address', 'invalid@e-mail@address', 202),
('replicate_p', 'not a function', 203),
('replicate_job_p', 'not a function', 203),
('replicated_fields', 'not a list', 205),
('replicated_fields', ['not', 'list', 'of', 'strings', 0], 206),
('replicated_fields', ['not', ('list', 'of'), 'strings', 0], 206),
('rid', -1, 207),
('rid', '0abc', 209),
('rid', 'ab-c', 209),
('sid', -1, 207),
('sid', 'abcdefg+z', 209),
('smtp_server', -1, 207),
('start_date', '2001-02-03 24-00-00', 201),
('use_deleted_selections', 'neither 0 nor 1', 200),
('use_deleted_selections', -1, 200),
('use_deleted_selections', 2, 200),
('use_perforce_jobnames', 'neither 0 nor 1', 200),
('use_perforce_jobnames', -1, 200),
('use_perforce_jobnames', 2, 200),
('use_stdout_log', 'neither 0 nor 1', 200),
('use_stdout_log', -1, 200),
('use_stdout_log', 2, 200),
]
def test_basic_parameters(self):
for param, value, msgid in self.bogus_basic_parameters:
self.check_param(param, value, msgid)
# 6.3. Basic errors in DT parameters are caught quickly
#
# As 6.2, but picks a set of tests based on config.dt_name.
bogus_Bugzilla_parameters = [
('bugzilla_directory', 'not a directory', 303),
('bugzilla_directory', '/', 304),
('closed_state', 'not a Bugzilla state', 301),
('dbms_database', -1, 207),
('dbms_host', -1, 207),
# Regression for job000168:
('dbms_port', '1234', 204),
('dbms_user', -1, 207),
('dbms_password', -1, 207),
('migrated_user_password', -1, 207),
('replicated_fields', 'not a list', 205),
('replicated_fields', ['not', 'list', 'of', 'strings', 0], 206),
('replicated_fields', ['bug_status'], 311),
('replicated_fields', ['assigned_to'], 311),
('replicated_fields', ['short_desc'], 311),
('replicated_fields', ['resolution'], 311),
('replicated_fields', ['longdesc', 'longdesc'], 312),
('replicated_fields', ['not a Bugzilla field'], 307),
]
bogus_Tracker_parameters = [
('tracker_user', -1, 207),
('tracker_password', -1, 207),
('tracker_project', -1, 207),
('tracker_server', -1, 207),
]
def test_dt_parameters(self):
dt_params = getattr(self, 'bogus_%s_parameters' % config.dt_name)
for param, value, msgid in dt_params:
self.check_param(param, value, msgid)
# 6.4. Parameter errors are caught by the defect tracker
#
# Like 6.2 and 6.3 this test sets a parameter to an incorrect value.
# In this case the error is caught by the defect tracker, so a
# message object isn't returned, but rather a string, which we must
# test directly rather than by message id.
mysql_errors = [
'_mysql.OperationalError',
'_mysql_exceptions.OperationalError',
]
erroneous_Bugzilla_parameters = [
('dt_name',
'not a defect tracker',
'exceptions.ImportError',
'No module named configure_not a defect tracker'),
('dbms_host',
'host.invalid',
mysql_errors,
'(2005, "Unknown MySQL Server Host'),
('dbms_database',
'invalid',
mysql_errors,
['(1049, "Unknown database \'invalid\'")',
'(1044, "Access denied for user:' ]),
('dbms_password',
'not the Bugzilla password',
mysql_errors,
['(1045, "Access denied for user:',
'(1044, "Access denied for user:']),
('dbms_user',
'not the Bugzilla user',
mysql_errors,
['(1045, "Access denied for user:',
'(1044, "Access denied for user:']),
]
erroneous_Tracker_parameters = [
]
def test_dt_errors(self):
params = getattr(self, 'erroneous_%s_parameters'
% config.dt_name)
for param, value, errors, message_texts in params:
reset_configuration()
setattr(config, param, value)
try:
self.initialize_replicator()
except:
(err, msg, _) = sys.exc_info()
if not isinstance(errors, types.ListType):
errors = [errors]
if str(err) not in errors:
self.addFailure("Set parameter %s to '%s': "
"expected error in %s but got "
"error '%s'."
% (param, value, errors, str(err)))
if isinstance(message_texts, types.ListType):
texts = message_texts
else:
texts = [message_texts]
found = 0
for text in texts:
if str(msg)[0:len(text)] == text:
found = 1
break
if not found:
self.addFailure("Set parameter %s to '%s': "
"expected error message in %s but "
"got '%s'."
% (param, value, texts, msg))
else:
self.addFailure("Set parameter %s to %s: expected "
"error in %s but there was no error."
% (param, value, errors))
# 6.5. OS-specific parameter tests
#
# As 6.3, but picks a set of tests based on os.name.
bogus_nt_parameters = [
('use_windows_event_log', -1, 200),
('use_windows_event_log', 2, 200),
('use_windows_event_log', 'neither 0 nor 1', 200),
]
bogus_posix_parameters = [
# By using fake Perforce client executables we can check that
# unsupported client and server versions are detected.
# Regression test for job000173.
('p4_client_executable', './fake_p4.py', 704),
('p4_client_executable', './fake_p4d_old_changelevel.py', 834),
('p4_client_executable', './fake_p4d_no_changelevel.py', 835),
]
def test_os_parameters(self):
os_params = getattr(self, 'bogus_%s_parameters' % os.name)
for param, value, msgid in os_params:
self.check_param(param, value, msgid)
def runTest(self):
"Illegal configuration parameters (test_p4dti.bogus)"
self.test_basic_parameters()
self.test_dt_parameters()
self.test_dt_errors()
self.test_os_parameters()
# 7. TEST CASE: EXISTING JOB IN PERFORCE
#
# The replicator should refuse to start if there's a job in Perforce.
#
# This is a regression test for job000219 and job000240.
class existing(p4dti_base):
def setUp(self):
self.setup_everything()
self.initialize_replicator()
def runTest(self):
"Startup with an existing job (test_p4dti.existing)"
j = self.p4.run('job -o')[0]
j['Description'] = 'Test job'
self.p4.run('job -i', j)
self.check_exception(self.r.poll, self.r.error, 914)
# 8. TEST CASE: MOVING THE START DATE
#
#
# 8.1. Moving the start date backwards in time
#
# When start_date is set to the current time, no issues should be
# replicated when the replicator starts. Similarly, refreshing Perforce
# has no effect. But you can set start_date back in time and refresh
# Perforce, this time with effect.
#
# This is a regression test for job000047, job000050, job000221.
class start_1(p4dti_base):
def setUp(self):
self.setup_everything()
def runTest(self):
"Moving the start_date backwards in time (test_p4dti.start_1)"
reset_configuration()
config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(time.time()))
self.initialize_replicator()
# When we poll, nothing should happen.
self.clear_log()
self.r.poll()
self.expected_only_in_range(800, 1000, [911, 912])
# Nor when we refresh.
self.clear_log()
self.r.refresh_perforce_jobs()
self.expected_only_in_range(800, 1000, [911, 912])
# The databases should report consistent.
self.check_consistency()
# Now set start date back in time and try refreshing again.
reset_configuration()
config.start_date = "1971-01-01 00:00:00"
self.initialize_replicator()
self.clear_log()
self.r.refresh_perforce_jobs()
self.expectation([803, 804, 812])
# The databases should still report consistent.
self.check_consistency()
# 8.2. Moving the start date forwards in time
#
# Start up with an old start date as normal, then move the start date
# forwards in time. The databases should still report consistent,
# because issues that were recorded as being replicated in the first
# poll should still be recorded as replicated, even though they haven't
# changed since the start date.
#
# This is a regression test for job000340.
class start_2(p4dti_base):
def setUp(self):
self.setup_everything()
self.check_startup()
def runTest(self):
"Moving the start_date forwards in time (test_p4dti.start_2)"
# Set start date forward in time and check the consistency.
reset_configuration()
config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(time.time()))
self.initialize_replicator()
# When we poll, nothing should happen.
self.poll()
self.expected_only_in_range(800, 1000, [911, 912])
# The databases should report consistent.
self.check_consistency()
# 9. TEST CASE: REPLICATING BY PROJECT
#
# This is a regression test for job000107, job000112, job000311.
class project(p4dti_base):
def setUp(self):
self.setup_everything()
def runTest(self):
"Replicate by project (test_p4dti.project)"
self.run_variant()
def Bugzilla(self):
config.replicated_fields = ['product']
config.replicate_p = lambda self: self['product'] == 'product 1'
self.check_startup()
jobs = self.p4.run('jobs')
for j in jobs:
assert j['Product'] == 'product 1\n', \
("Job %s has product %s; shouldn't be replicated."
% (j['Job'], j['Product']))
self.check_consistency()
# 10. ISSUE LIFE CYCLE TEST CASES
#
# This test creates issues and takes them through various kinds of
# lifecycle, checking that they are replicated correctly at each step.
class lifecycle(p4dti_base):
user = 'rb'
user1 = 'gdr'
if config.dt_name == 'Tracker':
user = 'fred'
user1 = 'Admin'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user, self.user1])
self.check_startup()
# Submit a new issue to the defect tracker. Run a replication
# cycle; check that the issue gets replicated to perforce. Return a
# pair of the defect tracker issue and the Perforce job.
def submit(self, user):
id = getattr(self, config.dt_name + '_submit')(user)
# It gets replicated to Perforce. This is a regression test for
# job000233.
self.check_replication_dt_to_p4(first_time = 1)
# Check that the job has been created in Perforce.
issue = self.r.dt.issue(id)
assert issue
jobname = issue.corresponding_id()
job = self.p4.run('job -o %s' % jobname)[0]
# Defect-tracker specific checks.
getattr(self, config.dt_name + '_submitted')(issue, job)
return issue, job
# Assign the issue to the user in the defect tracker. Run a
# replication cycle; check that the assignment gets replicated.
# Return the updated defect trakcer issue and Perforce job.
def assign(self, issue, job, user):
getattr(self, config.dt_name + '_assign')(issue, job, user)
# It gets replicated to Perforce.
self.check_replication_dt_to_p4()
# Defect-tracker specific checks.
issue = self.r.dt.issue(issue.id())
job = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_assigned')(issue, job)
return issue, job
# The job has been closed in Perforce. Check that the closure is
# replicated to the defect tracker and that the expected messages
# appear.
def close(self, issue, job, expected):
self.poll()
self.expectation(expected + [911, 912])
issue1 = self.r.dt.issue(issue.id())
job1 = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_closed')(issue, job,
issue1, job1)
def Bugzilla_submit(self, user):
fields = {
'reporter': 'nb@ravenbrook.com',
'product': 'product 1',
'version': 'unspecified',
'component': 'component 1.1',
'rep_platform': 'All',
'op_sys': 'All',
'priority': 'P1',
'bug_severity': 'critical',
'assigned_to': '',
'cc': '',
'bug_file_loc': '',
'short_desc': 'Life cycle # test bug',
'comment': 'Life cycle # test long description.',
'submit': ' Commit ',
'form_name': 'enter_bug',
'Bugzilla_login': config.bugzilla_admin_user,
'Bugzilla_password': config.bugzilla_admin_password,
}
result = self.dti.run_script('post_bug.cgi', fields)
match = re.search('<H2>Bug ([0-9]+) posted</H2>', result)
if match:
bugid = match.group(1)
else:
self.fail("Tried to submit a bug to Bugzilla, but got the "
"following in reply: %s." % result)
# Sleep for a second so that we can be sure to pick this
# up on the next poll.
time.sleep(1)
return bugid
def Bugzilla_submitted(self, bug, job):
for field, expected in [
('Status', 'bugzilla_new'),
('Summary', 'Life cycle # test bug\n'),
('Priority', 'P1'),
('Severity', 'critical'),
]:
assert job[field] == expected, \
("Expected new job %s to have %s '%s', but found "
"'%s'." % (job['Job'], field, expected, job[field]))
def Bugzilla_assign(self, bug, job, user):
# Assign the issue in Bugzilla. Ideally this should go through
# the Bugzilla web interface, but that's horrible. So cheat for
# now by going through the very low-level Bugzilla interface.
# (This is horrible too!)
bz_user = self.r.config.user_translator.translate_p4_to_dt(
user, self.r.dt, self.r.dt_p4)
changes = {'bug_status': 'ASSIGNED',
'assigned_to': bz_user}
bug_id = bug['bug_id']
self.r.dt.bugzilla.update_row('bugs', changes,
'bug_id = %d' % bug_id)
for k, v in changes.items():
activity = {
'bug_id': bug_id,
'who': bz_user,
'bug_when': self.r.dt.bugzilla.now(),
'fieldid': self.r.dt.bugzilla.fieldid(k),
'removed': str(bug[k]),
'added': str(v),
}
self.r.dt.bugzilla.insert_row('bugs_activity', activity)
# Wait two seconds so that the bug will be picked up in the next
# poll, not the one after.
time.sleep(2)
def Bugzilla_assigned(self, bug, job):
# The status should now be assigned.
job = self.p4.run('job -o %s' % job['Job'])[0]
assert job[self.r.config.job_status_field] == 'assigned', \
("Expected assigned job %(Job)s to have status "
"'assigned' but it has state %(Status)s." % job)
def Bugzilla_closed(self, bug, job, bug1, job1):
pass
def Tracker_submit(self, user):
fields = {
'Title': 'Auto-created bug',
'Description': 'Auto-created description',
'Severity': '3-Medium',
'Owner': 'Admin',
}
bugid = self.r.dt.tracker.create_bug(fields, user)
# Sleep for a second so that we can be sure to pick this
# up on the next poll.
time.sleep(1)
return bugid
def Tracker_submitted(self, bug, job):
for field, expected in [
(self.r.config.job_status_field, 'Open'),
('Severity', 'Medium'),
]:
assert job[field] == expected, \
("Expected new job %s to have %s '%s', but found "
"'%s'." % (job['Job'], field, expected, job[field]))
def Tracker_assign(self, bug, job, user):
# Assign the issue in Tracker.
tr_user = self.r.config.user_translator.translate_p4_to_dt(
user, self.r.dt, self.r.dt_p4)
changes = {'Owner': tr_user}
self.r.dt.tracker.update_bug(changes, bug, user)
# Wait two seconds so that the bug will be picked up in the next
# poll, not the one after.
time.sleep(2)
def runTest(self):
"Issue life cycle (test_p4dti.lifecycle)"
# 10.1. Simple issue lifecycle
#
# This is a simple issue cycle:
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. It is closed in Perforce (by editing the job).
# 5. The closure gets replicated back to the defect tracker.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
# Close the job in Perforce. This is a regression test for
# job000118.
job[self.r.config.job_status_field] = 'closed'
self.p4.run('job -i', job)
self.close(issue, job, [805, 824, 826])
# 10.2. Issue lifecycle (fix in Perforce)
#
# This is an issue cycle in which the issue is associated with a
# changelist in Perforce:
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. It is closed in Perforce (by making a fix).
# 5. Closure and fix get replicated back to the defect
# tracker.
#
# This is a regression test for job000133.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
self.p4.run('fix -c 1 %s' % job['Job'])
self.close(issue, job, [802, 805, 819, 820, 824, 826])
# 10.3. Issue lifecycle (fix on submission in Perforce)
#
# This tests an issue lifecycle in which the issue is associated
# with a pending changelist and closed on submission.
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. The job description is edited in Perforce (this is a
# regression test for job000362).
# 5. A fix is made with a pending changelist.
# 6. The change is submitted.
# 7. Job, fix get replicated back to the defect tracker.
#
# This is a regression test for job000225.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
job['Description'] = job['Description'] + '\nEdited.'
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 824, 911, 912], [826])
change = self.edit_file(self.user)
self.p4p[self.user].run('fix -c %s %s'
% (change['Change'], job['Job']))
self.p4p[self.user].run('submit -c %s' % change['Change'])
self.close(issue, job, [802, 805, 819, 820, 824, 826])
# Delete fix in Perforce and check that the deletion is
# replicated (regression test for job000013 and job000222).
self.p4p[self.user].run('fix -d -c %s %s'
% (change['Change'], job['Job']))
self.poll()
self.expected([818])
fixes = self.p4.run('fixes -j %s' % job['Job'])
assert len(fixes) == 0, ("Expected no fixes for %s, but found "
"%s." % (job['Job'], fixes))
# 10.4. Issue lifecycle (fix to assigned on submission in
# Perforce)
#
# As section 10.3, but the fix is to the job's current state
# rather than "closed", so the job state doesn't change.
# Regression test for job000007.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
change = self.edit_file(self.user)
status = job[self.r.config.job_status_field]
self.p4p[self.user].run('fix -s %s -c %s %s'
% (status, change['Change'],
job['Job']))
self.p4p[self.user].run('submit -c %s' % change['Change'])
self.poll()
self.expectation([802, 805, 819, 820, 825, 911, 912])
# Change fix status (only -- note that we set the job status
# back); check that it's replicated.
self.p4p[self.user].run('fix -s closed -c %s %s'
% (change['Change'], job['Job']))
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 819, 821, 825, 911, 912])
# 10.5. Simultaneous edit
#
# Provoke a conflict by changing an issue simultaneously in both
# systems. Then try again, but with a user other than the job
# owner. (Check that mail goes to both.)
#
# Make, update and delete a fix in Perforce simultaneously with
# the change. Make sure the fixes are respectively deleted,
# updated and restored. Ditto for filespecs.
change1 = self.edit_file(self.user) # make
change2 = self.edit_file(self.user) # update
change3 = self.edit_file(self.user) # delete
self.poll()
for u in [self.user, self.user1]:
issue, job = self.submit(self.user)
status = job[self.r.config.job_status_field]
# Make the fixes that we are going to update and delete.
self.p4p[self.user].run('fix -s %s -c %s %s'
% (status, change2['Change'],
job['Job']))
self.p4p[self.user].run('fix -s %s -c %s %s'
% (status, change3['Change'],
job['Job']))
job['P4DTI-filespecs'] = 'filespec_1\n'
self.p4p[self.user].run('job -i', job)
self.poll()
# Now edit the job simultaneously in DT and Perforce.
getattr(self, config.dt_name + '_assign')(issue, job,
self.user)
job[self.r.config.job_status_field] = 'closed'
job['P4DTI-filespecs'] = 'filespec_2\n'
self.p4p[u].run('job -i', job)
# Make, update and delete those fixes.
self.p4p[u].run('fix -c %s %s'
% (change1['Change'], job['Job']))
self.p4p[u].run('fix -c %s %s'
% (change2['Change'], job['Job']))
self.p4p[u].run('fix -d -c %s %s'
% (change3['Change'], job['Job']))
self.clear_log()
self.r.carefully_poll_databases()
self.expectation([800, 806, 811, 814, 815, 816, 817, 841,
860, 861, 853, 862, 812, 910, 911, 912])
issue = self.r.dt.issue(issue.id())
job = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_assigned')(issue, job)
self.poll()
fixes = self.p4.run('fixes -j %s' % job['Job'])
assert job['P4DTI-filespecs'] == 'filespec_1\n'
assert len(fixes) == 2
assert fixes[0]['Change'] == change3['Change']
assert fixes[0]['Status'] == status
assert fixes[1]['Change'] == change2['Change']
assert fixes[1]['Status'] == status
for m in self.log_messages:
if m.id == 800:
assert string.find(m.text,
config.administrator_address)
assert string.find(m.text, self.user) != -1
assert string.find(m.text, u) != -1
# 10.6. Illegal changes
#
# Regression test for job000429.
tests = getattr(self, config.dt_name + '_illegal_changes')
for field, value, expected in tests:
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
job[field] = value
self.p4p[self.user].run('job -i', job)
self.clear_log()
self.r.carefully_poll_databases()
self.expectation(expected + [800, 805, 811, 851, 860, 861,
862, 812, 852, 853, 910, 911,
912],
[824, 923])
self.poll()
# We're done; one last check for luck.
self.check_consistency()
Bugzilla_illegal_changes = [
('Product', 'no_such_product', [504]),
('Description', 'foo', [505]),
('Status', 'verified', [503]),
]
# 10a. ISSUE LIFE CYCLE TEST CASES
#
# This test creates issues and takes them through various kinds of
# lifecycle, checking that they are replicated correctly at each step.
# Specific to Tracker
class tracker_lifecycle(p4dti_base):
user = 'fred'
user1 = 'Admin'
user2 = 'jim'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user, self.user1, self.user2])
self.check_startup()
# Submit a new issue to the defect tracker. Run a replication
# cycle; check that the issue gets replicated to perforce. Return a
# pair of the defect tracker issue and the Perforce job.
def submit(self, user):
log_fn()
id = getattr(self, config.dt_name + '_submit')(user)
# It gets replicated to Perforce. This is a regression test for
# job000233.
self.check_replication_dt_to_p4(first_time = 1)
# Check that the job has been created in Perforce.
config.trk.login(config.tracker_user, config.tracker_password)
issue = self.r.dt.issue(id)
config.trk.logout()
assert issue
jobname = issue.corresponding_id()
job = self.p4.run('job -o %s' % jobname)[0]
# Defect-tracker specific checks.
getattr(self, config.dt_name + '_submitted')(issue, job)
return issue, job
# Assign the issue to the user in the defect tracker. Run a
# replication cycle; check that the assignment gets replicated.
# Return the updated defect trakcer issue and Perforce job.
def assign(self, issue, job, user):
log_fn()
getattr(self, config.dt_name + '_assign')(issue, job, user)
# It gets replicated to Perforce.
self.check_replication_dt_to_p4()
# Defect-tracker specific checks.
config.trk.login(config.tracker_user, config.tracker_password)
issue = self.r.dt.issue(issue.id())
job = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_assigned')(issue, job)
return issue, job
# The job has been closed in Perforce. Check that the closure is
# replicated to the defect tracker and that the expected messages
# appear.
def close(self, issue, job, expected):
log_fn()
self.poll()
self.expectation(expected + [911, 912])
config.trk.login(config.tracker_user, config.tracker_password)
issue1 = self.r.dt.issue(issue.id())
job1 = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_closed')(issue, job,
issue1, job1)
def Tracker_submit(self, user):
log_fn()
fields = {
'Title': 'Auto-created bug',
'Description': 'Auto-created description',
'Severity': '3-Medium',
'Owner': 'Admin',
}
config.trk.login(config.tracker_user, config.tracker_password)
bugid = self.r.dt.tracker.create_bug(fields, user)
config.trk.logout()
# Sleep for a second so that we can be sure to pick this
# up on the next poll.
time.sleep(1)
print "Bug:", bugid
return bugid
def Tracker_submitted(self, bug, job):
log_fn()
for field, expected in [
(self.r.config.job_status_field, 'open'),
('Severity', '3-medium'),
]:
assert job[field] == expected, \
("Expected new job %s to have %s '%s', but found "
"'%s'." % (job['Job'], field, expected, job[field]))
def Tracker_assign(self, bug, job, user):
log_fn()
# Assign the issue in Tracker.
tr_user = self.r.config.user_translator.translate_p4_to_dt(
self.user2, self.r.dt, self.r.dt_p4)
changes = {'Assigned To': tr_user}
self.update_bug(changes, bug, user)
# Wait two seconds so that the bug will be picked up in the next
# poll, not the one after.
time.sleep(2)
def Tracker_assigned(self, bug, job):
# The status should now be assigned.
job = self.p4.run('job -o %s' % job['Job'])[0]
assert job['Assigned_to'] == self.user2, \
("Expected assigned job %s to be assigned to ' "
"'assigned' but it has state %(Status)s." %
(job['Job'], self.user2, job['Assigned_to']))
def Tracker_closed(self, bug, job, bug1, job1):
pass
def Tracker_check_p4_to_dt(self, issue, job, value_list):
config.trk.login(config.tracker_user, config.tracker_password)
issue = self.r.dt.issue(issue.id())
config.trk.logout()
for field, expected in value_list:
if isinstance(expected, types.StringType):
assert issue[field] == expected, \
("Expected replicated issue %s to have %s '%s', but found "
"'%s'." % (issue['Id'], field, expected, issue[field]))
else: # assume function
assert expected(issue[field]), \
("Expected replicated issue %s to have valid field %s." %
(issue['Id'], field))
def check_job_fields(self, job, value_list):
j = self.p4.run('job -o %s' % job['Job'])[0]
for field, expected in value_list:
assert j[field] == expected, \
("Expected replicated job %s to have %s '%s', but found "
"'%s'." % (j['Job'], field, expected, j[field]))
def update_bug(self, changes, issue, user):
config.trk.login(config.tracker_user, config.tracker_password)
self.r.dt.tracker.update_bug(changes, issue, user)
config.trk.logout()
def runTest(self):
"Issue life cycle (test_p4dti.tracker_lifecycle)"
# 10.1. Simple issue lifecycle
#
# This is a simple issue cycle:
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. It is closed in Perforce (by editing the job).
# 5. The closure gets replicated back to the defect tracker.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
# Change the status and check for replication back
log_message("----Test for status change being replicated back")
job[self.r.config.job_status_field] = 'evaluated'
temp = self.p4p[self.user].run('job -o')
self.p4p[self.user].run('job -i', job)
## self.close(issue, job, [805, 824, 826])
self.poll()
self.expectation([805, 824, 911, 912], [826])
value_list = [('Status', 'Evaluated')]
self.Tracker_check_p4_to_dt(issue, job, value_list)
# Now check for Date Fixed field being set
log_message("----Test for Date Fixed field being set")
job[self.r.config.job_status_field] = 'closed'
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 824, 911, 912], [826])
value_list = [('Status', 'Fixed'),
('Date Fixed', lambda x: x <> '')]
self.Tracker_check_p4_to_dt(issue, job, value_list)
# Test updates of individual fields
# with auto-login
self.p4.run('logout')
changes = {'Status': 'Resubmitted'}
self.update_bug(changes, issue, self.user)
self.poll()
self.expectation([804, 812, 911, 912]) # Changed fields
value_list = [(self.r.config.job_status_field, 'open')]
self.check_job_fields(job, value_list)
# Test closing issue in Tracker - should be closed in p4
changes = {'State': 'Closed',
'Title': 'I am a title with " a single quote'}
self.update_bug(changes, issue, self.user)
self.poll()
self.expectation([804, 812, 911, 912]) # Changed fields
value_list = [(self.r.config.job_status_field, 'closed')]
self.check_job_fields(job, value_list)
# Update a field and replicate back.
job = self.p4p[self.user].run('job -o %s' % job['Job'])[0]
job["FIXED_IN_REL.VER.BLD[PATCH]"] = "Rel 1.2"
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 824, 911, 912])
# Test Perforce Note field
job = self.p4p[self.user].run('job -o %s' % job['Job'])[0]
job["PERFORCE_NOTE"] = "Some test notes\nwith some more stuff"
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 824, 911, 912])
# Update an existing note
job = self.p4p[self.user].run('job -o %s' % job['Job'])[0]
job["PERFORCE_NOTE"] += "Some changed stuff in this note"
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 824, 911, 912])
# Test updates of individual fields
changes = {'Perforce Note': 'This is a new note\r\nwith some trailing blank lines\r\n\r\n\r\n'}
self.update_bug(changes, issue, self.user)
self.poll()
self.expectation([804, 812, 911, 912]) # Changed fields
# 10.2. Issue lifecycle (fix in Perforce)
#
# This is an issue cycle in which the issue is associated with a
# changelist in Perforce:
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. It is closed in Perforce (by making a fix).
# 5. Closure and fix get replicated back to the defect
# tracker.
log_message("Test for lifecyle - fix in Perforce")
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
self.p4p[self.user].run('fix -c 1 %s' % job['Job'])
self.close(issue, job, [805, 819, 820, 824])
# 10.3. Issue lifecycle (fix on submission in Perforce)
#
# This tests an issue lifecycle in which the issue is associated
# with a pending changelist and closed on submission.
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. The job description is edited in Perforce (this is a
# regression test for job000362).
# 5. A fix is made with a pending changelist.
# 6. The change is submitted.
# 7. Job, fix get replicated back to the defect tracker.
log_message("----test for fix on submission")
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
job['Description'] = job['Description'] + '\nEdited.'
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 824, 911, 912], [826])
change = self.edit_file(self.user)
self.p4p[self.user].run('fix -c %s %s'
% (change['Change'], job['Job']))
self.p4p[self.user].run('submit -c %s' % change['Change'])
self.close(issue, job, [805, 819, 820, 824])
log_message("----test for fix replicated before submission")
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
# job['Description'] = job['Description'] + '\nEdited.'
# self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([911, 912], [826])
change = self.edit_file(self.user)
self.p4p[self.user].run('fix -c %s %s'
% (change['Change'], job['Job']))
# Poll and check that things are replicated OK.
self.poll()
self.expectation([805, 819, 820, 824, 911, 912])
# Now submit and poll again
self.p4p[self.user].run('submit -c %s' % change['Change'])
self.close(issue, job, [805, 824])
tr_user = self.r.config.user_translator.translate_p4_to_dt(
self.user, self.r.dt, self.r.dt_p4)
value_list = [('Fixed By', tr_user)]
self.Tracker_check_p4_to_dt(issue, job, value_list)
log_message("----test for deleted fix")
# Delete fix in Perforce and check that the deletion is
# replicated.
self.p4p[self.user].run('fix -d -c %s %s'
% (change['Change'], job['Job']))
self.poll()
self.expected([818])
fixes = self.p4.run('fixes -j %s' % job['Job'])
assert len(fixes) == 0, ("Expected no fixes for %s, but found "
"%s." % (job['Job'], fixes))
# 10.4. Issue lifecycle (fix to assigned on submission in
# Perforce)
#
# As section 10.3, but the fix is to the job's current state
# rather than "closed", so the job state doesn't change.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
change = self.edit_file(self.user)
status = job[self.r.config.job_status_field]
self.p4p[self.user].run('fix -s %s -c %s %s'
% (status, change['Change'],
job['Job']))
self.p4p[self.user].run('submit -c %s' % change['Change'])
self.poll()
self.expectation([805, 819, 820, 824, 911, 912])
# Change fix status (only -- note that we set the job status
# back); check that it's replicated.
job = self.p4p[self.user].run('job -o %s' % job['Job'])[0]
self.p4p[self.user].run('fix -s closed -c %s %s'
% (change['Change'], job['Job']))
self.p4p[self.user].run('job -i', job)
self.poll()
self.expectation([805, 819, 821, 825, 911, 912])
## # 10.5. Simultaneous edit
## #
## # Provoke a conflict by changing an issue simultaneously in both
## # systems. Then try again, but with a user other than the job
## # owner. (Check that mail goes to both.)
## #
## # Make, update and delete a fix in Perforce simultaneously with
## # the change. Make sure the fixes are respectively deleted,
## # updated and restored. Ditto for filespecs.
##
## change1 = self.edit_file(self.user) # make
## change2 = self.edit_file(self.user) # update
## change3 = self.edit_file(self.user) # delete
## self.poll()
## for u in [self.user, self.user1]:
## issue, job = self.submit(self.user)
## status = job[self.r.config.job_status_field]
## # Make the fixes that we are going to update and delete.
##
## self.p4p[self.user].run('fix -s %s -c %s %s'
## % (status, change2['Change'],
## job['Job']))
## self.p4p[self.user].run('fix -s %s -c %s %s'
## % (status, change3['Change'],
## job['Job']))
## job['P4DTI-filespecs'] = 'filespec_1\n'
## self.p4p[self.user].run('job -i', job)
## self.poll()
## # Now edit the job simultaneously in DT and Perforce.
## getattr(self, config.dt_name + '_assign')(issue, job,
## self.user)
## job[self.r.config.job_status_field] = 'closed'
## job['P4DTI-filespecs'] = 'filespec_2\n'
## self.p4p[u].run('job -i', job)
## # Make, update and delete those fixes.
## self.p4p[u].run('fix -c %s %s'
## % (change1['Change'], job['Job']))
## self.p4p[u].run('fix -c %s %s'
## % (change2['Change'], job['Job']))
## self.p4p[u].run('fix -d -c %s %s'
## % (change3['Change'], job['Job']))
## self.clear_log()
## self.r.carefully_poll_databases()
## self.expectation([800, 806, 811, 814, 815, 816, 817, 841,
## 860, 861, 853, 862, 812, 910, 911, 912])
## issue = self.r.dt.issue(issue.id())
## job = self.p4.run('job -o %s' % job['Job'])[0]
## getattr(self, config.dt_name + '_assigned')(issue, job)
## self.poll()
## fixes = self.p4.run('fixes -j %s' % job['Job'])
## assert job['P4DTI-filespecs'] == 'filespec_1\n'
##
## assert len(fixes) == 2
## assert fixes[0]['Change'] == change3['Change']
## assert fixes[0]['Status'] == status
## assert fixes[1]['Change'] == change2['Change']
## assert fixes[1]['Status'] == status
## for m in self.log_messages:
## if m.id == 800:
## assert string.find(m.text,
## config.administrator_address)
## assert string.find(m.text, self.user) != -1
## assert string.find(m.text, u) != -1
# We're done; one last check for luck.
config.trk.login(config.tracker_user, config.tracker_password)
self.check_consistency()
config.trk.logout()
# 11. P4DTI CONFIGURATION DATABASE
#
# This checks that parameters get added and removed from the
# configuration database. This is a regression test for job000169 and
# job000351.
class configdb(p4dti_base):
def setUp(self):
self.setup_everything()
def Bugzilla_config(self):
return self.r.dt.bugzilla.get_config()
def check(self, cf1):
reset_configuration()
for k, v in cf1.items():
setattr(config, k, v)
self.initialize_replicator()
cf2 = getattr(self, config.dt_name + '_config')()
for k, v in cf1.items():
if v != cf2.get(k):
self.addFailure("Set parameter %s to '%s', but found "
"'%s' in the config database."
% (k, v, cf2.get(k)))
def runTest(self):
"Replicator configuration database (test_p4dti.configdb)"
cf1 = { 'p4_server_description': 'spong',
'changelist_url': 'http://spong/changelist?%d',
'job_url': 'http://spong/job?%s' }
cf2 = { 'p4_server_description': 'spong',
'changelist_url': None,
'job_url': None }
self.check(cf1)
self.check(cf2)
self.check(cf1)
# 12. INCONSISTENCIES
#
# This test case checks that each kind of inconsistency that can be
# reported by the consistency checking script is detected and reported.
class inconsistencies(p4dti_base):
user = 'rb'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
def runTest(self):
"Consistency check failures (test_p4dti.inconsistencies)"
# 12.1. Unreplicated fix in Perforce
change = self.edit_file(self.user)
job1 = self.p4.run('jobs')[0]
self.p4p[self.user].run('fix -s %s -c %s %s'
% (job1[config.job_status_field],
change['Change'], job1['Job']))
self.clear_log()
self.r.check_consistency()
if not self.r.dt.supports('fixes'):
self.expectation([871, 890], [883, 884, 885])
else:
self.expectation([871, 878, 886, 890], [883, 884])
self.poll()
# 12.2. Fix with wrong status in Perforce
job1status = job1[config.job_status_field]
assert job1status != 'assigned'
self.p4p[self.user].run('fix -s assigned -c %s %s'
% (change['Change'], job1['Job']))
self.clear_log()
self.r.check_consistency()
self.expectation([871, 880, 886, 890], [883, 884])
self.p4p[self.user].run('fix -s %s -c %s %s'
% (job1status, change['Change'],
job1['Job']))
# 12.3. Changed job in Perforce - not relevant to Tracker
if config.dt_name <> 'Tracker':
job1['Description'] = job1['Description'] + '...\n'
self.p4p[self.user].run('job -i', job1)
self.clear_log()
self.r.check_consistency()
self.expectation([871, 875, 886, 890], [883, 884])
self.poll()
# 12.4. Unreplicated job in Perforce
#
# This is a regression test for job000372.
job4 = self.p4p[self.user].run('job -o')[0]
job4['P4DTI-rid'] = config.rid
job4['P4DTI-issue-id'] = "999999"
job4name = self.create_job(self.p4p[self.user], job4)
self.clear_log()
self.r.check_consistency()
self.expectation([871, 882, 886, 890], [883, 884])
# 12.5. Unreplicated job pointing to replicated issue
issueid = self.p4.run('jobs')[0]['P4DTI-issue-id']
job5 = self.p4p[self.user].run('job -o %s' % job4name)[0]
job5['P4DTI-issue-id'] = issueid
self.p4p[self.user].run('job -i', job5)
self.clear_log()
self.r.check_consistency()
self.expectation([871, 881, 886, 890], [883, 884])
self.p4p[self.user].run('job -d %s' % job5['Job'])
# 12.6. Missing job in Perforce
job6 = self.p4.run('jobs')[0]
self.p4p[self.user].run('job -d %s' % job6['Job'])
self.clear_log()
self.r.check_consistency()
self.expectation([871, 873, 886, 890], [883, 884])
# 12.7. Job pointing to wrong issue
#
# We also get "(P4DTI-875X) Job '%s' would need the following
# set of changes ..." because P4DTI-issue-id is wrong, and
# "(P4DTI-8793) Change %d fixes issue '%s' but there is no
# corresponding fix for job '%s'." because we added a fix and
# replicated it in section 12.1 above but now it's missing in
# Perforce.
id = job6['P4DTI-issue-id']
job6['P4DTI-issue-id'] = '999999'
self.p4p[self.user].run('job -i', job6)
self.clear_log()
self.r.check_consistency()
self.expectation([871, 874, 875, 879, 887, 890], [883, 884])
job6['P4DTI-issue-id'] = id
# Put things back to rights, and add a filespec in preparation
# for section 12.8 below.
job6['P4DTI-filespecs'] = 'filespec_1\n'
self.p4p[self.user].run('job -i', job6)
self.poll()
self.check_consistency()
# 12.8. Incorrect filespecs
#
# By changing the filespec we can provoke messages about
# filespecs being missing in both sides.
job6['P4DTI-filespecs'] = 'filespec_2\n'
self.p4p[self.user].run('job -i', job6)
self.clear_log()
self.r.check_consistency()
self.expectation([871, 876, 877, 887, 890], [883, 884])
# Polling should sort everything out.
self.poll()
self.check_consistency()
# 13. RACE DURING REPLICATION OF FIXES
#
# This is a regression test for job000385.
class race_385(p4dti_base):
user = 'rb'
change = None
original_fixes_differences = None
race_flag = None
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
def race(self, dt_fixes, p4_fixes):
fix_diffs = self.original_fixes_differences(dt_fixes, p4_fixes)
# Submit the change (this is the race). But only once.
if self.race_flag == 0:
self.p4p[self.user].run('submit -c %s'
% self.change['Change'])
self.race_flag = 1
return fix_diffs
def runTest(self):
"Race during replication of fixes (test_p4dti.race_385)"
# Create a pending change in Perforce and make a fix to that
# change.
self.change = self.edit_file(self.user)
job = self.p4.run('jobs')[0]
self.p4p[self.user].run('fix -s %s -c %s %s'
% (job[config.job_status_field],
self.change['Change'], job['Job']))
# Create a second pending change in Perforce (so that the first
# change will get renumbered when submitted).
self.edit_file(self.user)
# We'll make sure that the change gets submitted after
# fixes_differences gets called -- this is our last opportunity
# to run the race before the possibly illegal call to p4 change
# -o.
self.original_fixes_differences = self.r.fixes_differences
self.r.fixes_differences = self.race
self.race_flag = 0
# Replicate.
self.poll()
# Restore the original fixes_differences.
self.r.fixes_differences = self.original_fixes_differences
# Check that the replication succeeded.
self.expectation([802, 805, 819, 820], [825, 911, 912])
self.check_consistency()
# 14. CAN CONFIRM AN UNCONFIRMED BUG
#
# This is a regression test for job000262 and job000410. Bugzilla only.
class unconfirmed(p4dti_base):
bug_id = 27 # this bug is UNCONFIRMED in our test database
user = 'rb'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
def runTest(self):
"Confirm an UNCONFIRMED bug (test_p4dti.unconfirmed)"
job = self.p4.run('job -o bug%d' % self.bug_id)[0]
assert job[self.r.config.job_status_field] == 'unconfirmed'
job[self.r.config.job_status_field] = 'bugzilla_new'
self.p4p[self.user].run('job -i', job)
# Replicate.
self.poll()
# Check that the replication succeeded.
self.expectation([805, 824], [911, 912])
self.check_consistency()
# 15. REPLICATE NEW JOBS FROM PERFORCE
#
# This checks that a new job in Perforce can be submitted to the defect
# tracker and successfully replicated thereafter. This is a regression
# test for job000036.
class new_p4_job(p4dti_base):
user = 'rb'
Bugzilla_job = {
'Summary': 'test summary',
'User': user,
'Description': 'test description',
'Priority': 'P3',
'Severity': 'normal',
'Product': 'product 1',
}
def Bugzilla_prepare_issue(self, issue, job):
default_component = {
'product 1': 'component 1.1',
'product 2': 'component 2.1',
'unconfirmed': 'unconf1',
'group': 'group1',
}
if issue['product'] == '':
issue['product'] = 'product 1'
if issue['component'] == '':
issue['component'] = default_component[issue['product']]
if issue['version'] == '':
issue['version'] = 'unspecified'
def setUp(self):
self.setup_everything()
config.prepare_issue = getattr(self, config.dt_name
+ '_prepare_issue')
config.replicate_job_p = lambda(job): 1
self.setup_perforce([self.user])
self.check_startup()
def runTest(self):
"Replicate a new job from Perforce (test_p4dti.new_p4_job)"
# Add a job to Perforce.
job = self.p4p[self.user].run('job -o')[0]
for k, v in getattr(self, config.dt_name + '_job').items():
job[k] = v
jobname = self.create_job(self.p4p[self.user], job)
# Replicate it.
self.poll()
self.expected([892, 894])
self.expected_only_in_range(800, 914,
[826, 892, 894, 911, 912])
# Check that the job is replicated.
self.check_consistency()
job = self.p4.run('job -o %s' % jobname)[0]
assert job['P4DTI-rid'] == config.rid
# 16. WINDOWS NT SERVICE AND EVENT LOG
#
# This is a regression test for job000046 and job000149.
class nt_service(p4dti_base):
# Service name in the Windows registry.
_svc_name_ = 'p4dti_service'
# If the service can't replicate its initial batch of issues in five
# minutes, we'll consider it to have failed (this depends on the
# test database not having too many issues).
timeout = 300
# Number of polls before we expect consistency to be achieved.
polls_for_consistency = 3
def setUp(self):
self.setup_everything()
self.initialize_replicator()
# Handle on Service Manager.
import win32service
access_level = win32service.SC_MANAGER_ALL_ACCESS
self.hscm = win32service.OpenSCManager(None, None, access_level)
# Handle on Event Log.
import win32evtlog
self.hevt = win32evtlog.OpenEventLog(None, 'Application')
# Establish configuration environment for service. We want the
# current configuration with the following changes: NT Event
# logging must be set; logging level is DEBUG; email from the
# replicator is supressed.
controls = (('P4DTI_CONFIG', os.path.abspath(config_filename)),
('P4DTI_EVTLOG', ''),
('P4DTI_LOGLEVEL', str(message.DEBUG)),
('P4DTI_ADMINADDR', ''),
)
for key, value in controls:
os.environ[key] = value
# If the service is already there, remove it.
if self.query_service():
self.remove_service()
def tearDown(self):
import win32service
import win32evtlog
win32service.CloseServiceHandle(self.hscm)
win32evtlog.CloseEventLog(self.hevt)
# Return None if service not currently installed. Otherwise
# return currentState field as reported by Service Manager.
def query_service(self):
import win32service
type_filter = win32service.SERVICE_WIN32
state_filter = win32service.SERVICE_STATE_ALL
query = win32service.EnumServicesStatus
# List all registered services
services = query(self.hscm, type_filter, state_filter)
# Is ours there?
for svc_name, description, status in services:
if svc_name == self._svc_name_:
# Looks like it was.
currentState = status[1]
return currentState
# Looks like it wasn't.
return None
def wait(self, function):
timeout = self.timeout
while timeout > 0:
if function():
return 1
time.sleep(1)
timeout = timeout - 1
return 0
def wait_for_status(self, status):
query = self.query_service
return self.wait(lambda query=query, status=status:
query() == status)
def read_event_log_first_time(self):
import win32evtlog
hevt = self.hevt
oldest_record = win32evtlog.GetOldestEventLogRecord(hevt)
record_count = win32evtlog.GetNumberOfEventLogRecords(hevt)
newest_record = oldest_record + record_count - 1
# Random access into the Event Log.
read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ +
win32evtlog.EVENTLOG_SEEK_READ)
# Read newest event and ignore it. This has the required
# side-effect of setting the handle's reading position in the
# log, ready for future reads.
win32evtlog.ReadEventLog(hevt, read_flags, newest_record)
def wait_for_event_log(self, msg):
return self.wait(lambda self=self, msg=msg:
self.match_event_log(msg))
# Read all the log entries that have been written since last
# time. It's remotely possible that we might flush the message
# we're looking for twice in one call to ReadEventLog, but this is
# acceptable as it is harmless to pool again.
def match_event_log(self, msg):
import win32evtlog
hevt = self.hevt
# Sequential access from where we left off, last time we read
# on this handle.
read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ +
win32evtlog.EVENTLOG_SEQUENTIAL_READ)
import catalog
# This is the string we're looking for.
wanted = str(catalog.msg(msg))
# We have to make an undeterminable number of reads to exhaust
# the log. Each read will return some undeterminable number of
# records. (I can't tell from documentation whether zero is a
# possible number in cases where there are records to be
# read. This doesn't matter, as we're prepared to come back
# several times.)
records = win32evtlog.ReadEventLog(hevt, read_flags, 0)
while records:
for record in records:
message = record.StringInserts[0]
if message == wanted:
return 1
records = win32evtlog.ReadEventLog(hevt, read_flags, 0)
return 0
# Simplify hook into service.main()
def main(self, *args):
import service
service.main([''] + list(args))
# The next four methods have to use service.py, as that's what
# we're testing.
def install_service(self):
self.main()
def remove_service(self):
self.main('remove')
def start_service(self):
self.main('start')
def halt_service(self):
self.main('stop')
def runTest(self):
"Manage NT service (test_p4dti.nt_service)"
import win32service
# Install the service and ensure that it's there.
self.install_service()
assert self.query_service()
# Start it running.
self.read_event_log_first_time()
self.start_service()
assert self.wait_for_status(win32service.SERVICE_RUNNING)
# Allow the replication to happen.
for i in range(self.polls_for_consistency):
# (message.DEBUG, "Poll finished.")
assert self.wait_for_event_log(912)
# Halt the service.
self.halt_service()
assert self.wait_for_status(win32service.SERVICE_STOPPED)
# Remove service and ensure that it's gone away.
self.remove_service()
assert not self.query_service()
# Check that replication succeeded.
self.check_consistency()
# 17. MIGRATION
#
# This tests that jobs in Perforce can be migrated from the default
# Perforce jobspec to the defect tracker and replicated thereafter.
# This is a regression test for job000022, job000249, job000422 and
# job000426.
class migrate(p4dti_base):
n_jobs = 20
# include a non-existent user for migration to create.
users = ['rb', 'nb', 'gdr', 'spong']
states = ['open', 'closed', 'suspended']
fixes = {}
def setUp(self):
self.setup_everything()
for param in ["migrate_p", "translate_jobspec",
"prepare_issue"]:
setattr(config, param,
getattr(self, config.dt_name + "_" + param))
self.setup_perforce(self.users)
self.initialize_replicator()
self.create_jobs()
# delete the replicator user from the Bugzilla database, so
# we can test that migration creates it.
self.dti.system('mysql -u "%s" "%s" -e "delete from profiles where login_name = \'%s\'"' %
(config.dbms_user,
config.dbms_database,
config.replicator_address))
def create_jobs(self):
self.fixes = {}
# Create some jobs in Perforce.
for i in range(self.n_jobs):
job = self.p4.run('job -o')[0]
user = self.users[i % len(self.users)]
status = self.states[i % len(self.states)]
changes = {
'Description': ("First line of job %d\n"
"Remainder of job %d\n"
"Blah blah blah...\n" % (i, i)),
'User': user,
self.r.config.job_status_field: status,
}
self.r.update_job(job, changes)
self.p4p[user].run('fix -s %s -c 1 %s'
% (status, job['Job']))
self.fixes[job['Job']] = status
def runTest(self):
"Migration from Perforce jobs (test_p4dti.migrate)"
self.clear_log()
self.r.migrate_users()
self.r.migrate()
self.expected([892, 895])
self.expected_only_in_range(800, 914, [802, 819, 820, 892, 895])
self.r.refresh_perforce_jobs()
self.check_consistency()
# Check that no fixes have been lost, changed or added.
# Regression test for job000271.
for f in self.p4.run('fixes'):
if not self.fixes.has_key(f['Job']):
self.addFailure("Found unexpected fix for job '%s'."
% f['Job'])
else:
if self.fixes[f['Job']] != f['Status']:
self.addFailure("Expected fix for job '%s' to have "
"status '%s', but found '%s'."
% (f['Job'], self.fixes[f['Job']],
f['Status']))
del self.fixes[f['Job']]
for j in self.fixes.keys():
self.addFailure("Expected a fix for job '%s', but didn't "
"find it." % j)
def Bugzilla_migrate_p(self, job):
return 1
def Bugzilla_prepare_issue(self, issue, job):
issue["product"] = "product 1"
issue["component"] = "component 1.1"
issue["version"] = "unspecified"
def Bugzilla_translate_jobspec(self, job):
desc = job.get("Description", "")
newline = string.find(desc, "\n")
job["Summary"] = desc[:newline]
job["Description"] = desc[newline+1:]
job["User"] = job.get("User", "")
if 'reporter' in config.replicated_fields:
job["Reporter"] = job.get("User", "")
if 'qa_contact' in config.replicated_fields:
job["QA_Contact"] = "None"
status_map = {
"open": ("assigned", ""),
"closed": ("closed", "FIXED"),
"suspended": ("closed", "LATER"),
}
(status, resolution) = status_map[job.get(self.r.config.job_status_field, "open")]
job["Status"] = status
job["Resolution"] = resolution
job["Severity"] = "blocker"
job["Priority"] = "P1"
return job
# 18. BUGZILLA PARAMETERS
#
# Check that editing Bugzilla parameters causes the
# p4dti_bugzilla_parameters table to be created in the Bugzilla
# database, and that the parameters we expect to be there really are.
class bugzilla_params(p4dti_base):
expected_parameters = [
'emailregexp',
'emailregexpdesc',
'emailsuffix',
'p4dti',
]
def runTest(self):
"Bugzilla parameters (test_p4dti.bugzilla_params)"
p4dti_param = whrandom.randint(1,1000000)
# Confirm that the replicator spots the absense of the
# p4dti_bugzilla_parameters table.
self.setup_everything({ 'bugzilla_mysqldump': None })
self.clear_log()
self.initialize_replicator()
self.expected([129])
# Follow the instructions in [RB 2000-08-10, 5.4.3]: create the
# p4dti_bugzilla_parameters table by editing the parameters. We
# take the opportunity to set the 'p4dti' parameter to a random
# number which we will read back later to check that the table
# is being updated correctly.
self.dti.edit_parameters({
'p4dti': str(p4dti_param),
})
# Confirm that replicator finds the p4dti_bugzilla_parameters
# table and the required parameters are present.
self.clear_log()
reset_configuration()
self.initialize_replicator()
self.expected_not([129, 130])
for p in self.expected_parameters:
assert config.bugzilla.params.has_key(p)
# Confirm that the p4dti_parameter matches.
assert int(config.bugzilla.params['p4dti']) == p4dti_param
# 19. ENUM KEYWORDS WITH SPACES
#
# This is a regression test for job000445. Bugzilla only.
class enum_spaces(p4dti_base):
def runTest(self):
"Bugzilla enums containing spaces (test_p4dti.enum_spaces)"
self.setup_everything({
'bugzilla_mysqldump': 'job000445-mysqldump',
})
self.check_startup()
self.check_consistency()
# 20. BUGZILLA EMAILSUFFIX
#
# This checks that when the Bugzilla "emailsuffix" parameter is set,
# that the P4DTI correctly translates users between Bugzilla and
# Perforce. Bugzilla only. Regression test for job000352.
class emailsuffix(p4dti_base):
users = ['gdr', 'nb', 'ndl', 'rb']
def runTest(self):
"Bugzilla 'emailsuffix' parameter (test_p4dti.emailsuffix)"
self.setup_everything({
'bugzilla_mysqldump': 'job000352-mysqldump',
})
self.setup_perforce(self.users)
self.dti.edit_parameters({
'p4dti': 1,
'emailregexp': '^.*$',
'emailsuffix': email_suffix,
})
self.check_startup()
self.expected_not([129, 130, 516, 536, 867])
# 21. PERFORCE JOB/FIX CONSISTENCY
#
# Check that Perforce updates the 'P4DTI-user' field whenever someone
# changes a job by fixing it. Regression test for job000086 and
# job000276.
class fix_update(p4dti_base):
def check_modifier(self, jobname, expected, case):
job = self.p4p['a'].run('job -o %s' % jobname)[0]
for found, desc in ((job['P4DTI-user'], "job['P4DTI-user']"),
(self.r.job_modifier(job), "job modifier")):
if found != expected:
self.addFailure("After %s, expected %s to be "
"'%s', but found '%s'."
% (case, desc, expected, found))
return job
def runTest(self):
"Perforce updates jobs when fix (test_p4dti.fix_update)"
self.setup_everything()
self.setup_perforce(['a', 'b', 'c', 'd', 'e', 'f'])
self.initialize_replicator()
self.r.prepare_to_run()
# Check that the "P4DTI-user" field has the correct value after
# a series of actions:
# 21.1. Job creation
job = self.p4p['a'].run('job -o')[0]
jobname = self.create_job(self.p4p['a'], job)
job = self.check_modifier(jobname, 'a', "job creation")
status = job[config.job_status_field]
# 21.2. Job editing
job['Description'] = 'test job 2'
self.p4p['b'].run('job -i', job)
job = self.check_modifier(jobname, 'b', "job editing")
# 21.3. Fixing (with change in status)
self.p4p['c'].run('fix -c 1 %s' % jobname)
job = self.check_modifier(jobname, 'c', "fixing (with change "
"in status)")
# 21.4. Fixing a submitted changelist (no change in status)
change = self.edit_file('d')
self.p4p['d'].run('submit -c %s' % change['Change'])
self.p4p['d'].run('fix -c %s %s' % (change['Change'], jobname))
job = self.check_modifier(jobname, 'd', "fixing (without "
"change in status)")
# 21.5. Changing the status of an existing fix
self.p4p['e'].run('fix -s %s -c 1 %s' % (status, jobname))
job = self.check_modifier(jobname, 'e', "changing fix status ")
# 21.6. Submitting pending changelist
change = self.edit_file('f')
self.p4p['f'].run('fix -c %s %s' % (change['Change'], jobname))
self.p4p['f'].run('submit -c %s' % change['Change'])
job = self.check_modifier(jobname, 'f', "submitting a pending "
"changelist")
# 22. FREQUENT EDITS
#
# Check that frequent edits (at least one per poll) in the defect
# tracker don't cause conflicts. Regression test for job000016,
# job000042.
class frequent_edits(lifecycle):
def runTest(self):
"Frequent edits cause no conflicts (test_p4dti.frequent_edits)"
# Submit an issue to the defect tracker and replicate it.
id = getattr(self, config.dt_name + '_submit')(self.user)
self.poll()
self.expectation([803, 804, 812, 911, 912])
# Check that it was replicated correctly.
issue = self.r.dt.issue(id)
jobname = issue.corresponding_id()
job = self.p4.run('job -o %s' % jobname)[0]
getattr(self, config.dt_name + '_submitted')(issue, job)
# Assign it in the defect tracker and replicate the assignment.
getattr(self, config.dt_name + '_assign')(issue, job, self.user)
self.poll()
self.expectation([804, 812, 911, 912])
# 23. BUGZILLA PERFORCE SECTION
#
# Check that the Bugzilla patch is correctly producing a Perforce
# section in the bug form.
class perforce_section(lifecycle):
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.dti.edit_parameters({ 'p4dti': 1 })
self.check_startup()
def runTest(self):
"Perforce section in Bugzilla (test_p4dti.perforce_section)"
# Submit a new job to Bugzilla and fix it in Perforce.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
self.p4.run('fix -c 1 %s' % job['Job'])
self.close(issue, job, [802, 805, 819, 820, 824, 826])
# Get the bug form.
bug_form = self.dti.run_script("show_bug.cgi", {
'bugzilla_login': config.bugzilla_admin_user,
'bugzilla_password': config.bugzilla_admin_password,
'id': issue.id(),
})
parser = self.bug_form_parser()
parser.feed(bug_form)
parser.close()
parser.check()
# 23.1. Parse Bugzilla bug form
class bug_form_parser(p4dti_html_parser):
def __init__(self):
self.seen_p4dti = 0
self.in_p4dti = 0
sgmllib.SGMLParser.__init__(self)
def check(self):
assert self.seen_p4dti == 1
def start_div(self, attrs_list):
attrs = self.attrs(attrs_list)
if attrs.get('class') == 'p4dti':
self.seen_p4dti = self.seen_p4dti + 1
self.in_p4dti = 1
def end_div(self):
self.in_p4dti = 0
# RUNNING THE TESTS
def tests():
suite = unittest.TestSuite()
tests = [start_1, bogus, configdb, existing, fix_update,
frequent_edits, inconsistencies, lifecycle, migrate,
new_p4_job, normal, project, race_385, start_2]
if os.name == 'nt':
tests.extend([nt_service])
if config.dt_name == 'Bugzilla':
tests.extend([bugzilla_params, emailsuffix, enum_spaces,
perforce_section, unconfirmed])
if config.dt_name == 'Tracker':
# Only required tests - so overwrite above lists
tests = [bogus, existing, fix_update,
frequent_edits, normal]
tests = [tracker_lifecycle]
## tests = [normal]
# optional tests - commented out for now
# test.extend([start_1, race_385, start_2, lifecycle, migrate]
for t in tests:
suite.addTest(t())
return suite
if __name__ == "__main__":
unittest.main(defaultTest="tests")
# A. REFERENCES
#
# [Barnson 2001-08-29] "Bugzilla Guide (revision 2.14.0)"; Matthew
# Barnson; 2001-08-29;
# <http://www.ravenbrook.com/project/p4dti/import/2001-08-29/bugzilla-2.14/bugzilla-2.14/docs/html/>.
#
# [GDR 2000-12-31] "Automated testing plan" (e-mail message); Gareth
# Rees; Ravenbrook Limited; 2000-12-31;
# <http://info.ravenbrook.com/mail/2000/12/31/14-42-26/0.txt>.
#
# [PyUnit] "PyUnit - a unit testing framework for Python"; Steve
# Purcell; <http://pyunit.sourceforge.net/>.
#
# [RB 2000-08-10] "Perforce Defect Tracking Integration Administrator's
# Guide"; Richard Brooksby; Ravenbrook Limited; 2000-08-10;
# <http://www.ravenbrook.com/project/p4dti/version/2.1/manual/ag/>.
#
# [RB 2000-12-08] "init.py -- Initialize replicator and defect tracker";
# Richard Brooksby; Ravenbrook Limited; 2000-12-08;
# <http://www.ravenbrook.com/project/p4dti/version/2.1/code/replicator/init.py>.
#
#
# B. DOCUMENT HISTORY
#
#
#
# C. COPYRIGHT AND LICENSE
#
# This file is copyright (c) 2001 Perforce Software, Inc. All rights
# reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
#
#
# $Id: //info.ravenbrook.com/project/p4dti/version/2.1/test/test_p4dti.py#2 $