ReplicaTester.py #1

  • //
  • guest/
  • alan_petersen/
  • ReplicaTester/
  • ReplicaTester.py
  • View
  • Commits
  • Open Download .zip Download (17 KB)
#!/usr/bin/python

import os
import re
import sys
import stat
import time
import uuid
import socket
import thread
import getopt
import marshal
import logging
import threading
from subprocess import Popen, PIPE, STDOUT
import xml.etree.ElementTree as ET

# usage
def usage():
	PROGRAM_USAGE = """\
Usage:	ReplicaTester.py [-h] [-C] [-l logfile] [-v] [-t type] -c configFile 
           -h = show help (this message)
           -C = generate sample configuration file
           -v = verbose (debug mode)
           -l logfile = specify logfile (default is STDOUT)
           -c configFile = the XML configuration file
           -t type = type of test to perform [ping|meta|file|all]
                        ping - simple p4 ping test to each replica
                        meta - metadata (p4 key) update
                        file - distribution of file
                         all - perform each of these in turn (default)
   ** NOTE: Tests are run in parallel and could result in increased network
            activity. Please use responsibly.
   ** NOTE: for file testing, a client workspace MUST be already defined.
            This script will not create one for you.
"""
	print(PROGRAM_USAGE)
	sys.exit(0)

# genConfigFile
#    generates a sample XML configuration file, suitable as a template
def genConfigFile():
	cfg = """\
<?xml version="1.0" encoding="utf-8"?>
<config>
	<!-- path to p4 executable -->
	<p4>/usr/local/bin/p4</p4>
	<!-- 
		test file size is specified in bytes
		1KB = 1024
		1MB = 1048576
		10MB = 10485760
	 -->
	<fileSize>10485760</fileSize>
	<!-- path for the depot test file (will be created in the client workspace -->
	<depotFile>//depotName/testfile</depotFile>
	<!-- name for key used in metadata test -->
	<metaTestKey>METADATA_TEST_KEY</metaTestKey>
	<!-- admin username and password -->
	<adminUser>admin</adminUser>
	<adminPass>passwd</adminPass>
	<!-- client workspace (must already exist) -->
	<client>repl-client</client>
	<!-- master server entry
			- name = display name
			- text = P4PORT
	-->
	<master name="Master">master:1666</master>
	<!-- list of replica servers to check -->
	<replicaList>
		<!-- replica server entry
		       - name = display name
		       - attempts = maximum number of attempts to check the server (defaults to 10)
		       - text = P4PORT
		-->
		<replica name="Replica01" attempts="10">replica1:1666</replica>
	</replicaList>
</config>
"""
	print(cfg)
	sys.exit()

# parseConfigFile
#    parses the configuration file specified by the path, storing values in a global dictionary g_config
def parseConfigFile(path):
	global g_config
	tree = ET.parse(path)
	config = tree.getroot()

	x = config.find('p4')
	if x is not None:
		g_config['p4'] = x.text

	x = config.find('metaTestKey')
	if x is not None:
		g_config['metaTestKey'] = x.text

	x = config.find('fileSize')
	if x is not None:
		g_config['fileSize'] = int(x.text)

	x = config.find('depotFile')
	if x is not None:
		g_config['depotFile'] = x.text

	x = config.find('adminUser')
	if x is not None:
		g_config['admin.user'] = x.text
			
	x = config.find('adminPass')
	if x is not None:
		g_config['admin.pass'] = x.text
		
	x = config.find('client')
	if x is not None:
		g_config['client'] = x.text
		
	x = config.find('master')
	if x is not None:
		g_config['master.port'] = x.text
		g_config['master.name'] = x.attrib['name']
		g_config['master.timeout'] = int(x.attrib['timeout'])  if 'timeout' in x.attrib else DEFAULT_TCP_TIMEOUT

	x = config.find('replicaList')
	if x is not None:
		replicas = []
		for r in x:
			replica = {}
			replica['name'] = r.attrib['name']
			replica['attempts'] = int(r.attrib['attempts']) if 'attempts' in r.attrib else DEFAULT_ATTEMPTS
			replica['port'] = r.text
			replica['timeout'] = int(r.attrib['timeout']) if 'timeout' in r.attrib else DEFAULT_TCP_TIMEOUT
			replicas.append(replica)
		g_config['replicas'] = replicas
			
# global variables
DEFAULT_ATTEMPTS=10
DEFAULT_TCP_TIMEOUT=1
g_config = {}
g_ticket = None

# error
#     Exit with error messages
def error(*msg):
	logging.error(*msg)

# errorExit
#     Exit with error messages
def errorExit(*msg):
	logging.error(*msg)
	exit()

# exit 
#     exit with error
def exit(*msg):
	for m in msg:
		logging.info(m)
	raise SystemExit()

# p4MarshalCmd
#     executes the p4 command, results sent to a list
def p4MarshalCmd(cmd, p4user, p4port, quiet=False):
	if not quiet:
		logging.debug("p4 {0}".format(" ".join(cmd)))
	list = []
	baseCmd = [g_config['p4'], "-p", p4port, "-u", p4user, "-c", g_config['client'], "-G"]
	if g_ticket is not None:
		baseCmd.append("-P")
		baseCmd.append(g_ticket)
	pipe = Popen(baseCmd + cmd, stdout=PIPE).stdout
	try:
		while 1:
			record = marshal.load(pipe)
			list.append(record)
	except EOFError:
		pass
	pipe.close()
	return list

# p4InputCmd
#     executes the p4 command with input
def p4InputCmd(data, cmd, p4user, p4port, quiet=False):
	if not quiet:
		logging.debug("p4 {0}".format(" ".join(cmd)))
	list = []
	baseCmd = [g_config['p4'], "-p", p4port, "-u", p4user, "-c", g_config['client']]
	if g_ticket is not None:
		baseCmd.append("-P")
		baseCmd.append(g_ticket)
	proc = Popen(baseCmd + cmd, stdout=PIPE, stdin=PIPE, stderr=STDOUT)
	result = proc.communicate(input=data)
	return result

# p4Cmd
#     executes a p4 command, returns results
def p4Cmd(cmd, p4user, p4port, quiet=False):
	if not quiet:
		logging.debug("p4 {0}".format(" ".join(cmd)))
	baseCmd = [g_config['p4'], "-p", p4port, "-u", p4user, "-c", g_config['client']]
	if g_ticket is not None:
		baseCmd.append("-P")
		baseCmd.append(g_ticket)
	proc = Popen(baseCmd + cmd, stdout=PIPE, stderr=STDOUT)
	result = proc.communicate()
	return result

# containsError
#     utility function to check for any error code in the results array
def containsError(results=[],logError=True):
    foundError = False
    for r in results:
        if 'code' in r:
            if r['code'] == 'error':
                foundError = True
                if logError:
                    error(r['data'])
            elif r['code'] == 'info':
            #code info output can be important in troubleshooting
                debug(r['data'])
    return foundError

# generateRandomFile
#     creates a random binary file in the specified location
def generateRandomFile(file, size):
	logging.debug("creating {0} with size {1}...".format(file,size))
	try:
		# first check to see if the parent directory exists
		# and create it if it doesn't
		d = os.path.dirname(file)
		if not os.path.exists(d):
			os.makedirs(d)
		# delete the file if it already exists (to get around read-only settings on Windows)
		if os.path.exists(file):
			os.chmod( file, stat.S_IWRITE )
			os.unlink( file )
		with open(file, 'wb') as fout:
			fout.write(os.urandom(size))
	except:
		raise Exception("Cannot create random file " + file)

# createChange
#    creates a changelist with the specified description and returns the changelist number
def createChange(description):
	change = 0
	spec = """\
Change:	new
Client:	{0}
User:	{1}
Status:	new
Description: {2}
""".format(g_config['client'], g_config['admin.user'], description)
	cmd = ["change", "-i"]
	result = p4InputCmd(spec, cmd, g_config['admin.user'], g_config['master.port'])
	if result[1] is not None:
		raise Exception("Error creating changelist")
	matchObj = re.match("Change (\d*) created.*", result[0])
	if matchObj:
		change = int(matchObj.group(1))
	return change

# depotFileExists
#    checks to see if the specified path exists on the server
def depotFileExists(depotFile):
	exists = False
	cmd = ["files", depotFile]
	result = p4MarshalCmd(cmd, g_config['admin.user'], g_config['master.port'])
	if not containsError(result):
		exists = True
	return exists

# getTicket
#    performs a login -a -p on the master server using the admin user and password
#    and returns the ticket if successful
def getTicket():
	ticket = None
	cmd = ['login', '-a', '-p']
	result = p4InputCmd(g_config['admin.pass'], cmd, g_config['admin.user'], g_config['master.port'])
	if(result[0].startswith("Enter password:")):
		ticket = result[0].replace("Enter password:", "").strip()
	else:
		raise Exception("Unable to get ticket")
	return ticket

# tcpPing
#    simple utility that tries to make a TCP/IP socket connection to the specified server:port
def tcpPing(p4port, timeout=1):
	[host,port] = p4port.split(":",2)
	pingable = False
	try:
		s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		s.settimeout(timeout)
		s.connect((host, int(port)))
		pingable = True
		s.close()
	except:
		logging.debug("[tcp ping]: %s unreachable" % p4port)
	return pingable

# p4Ping
#    runs a p4 ping on the specified replica
def p4Ping(replica):
	logging.debug("pinging %s" % replica['name'])
	cmd = ["ping"]
	result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True)
	if containsError(result):
		logging.info(result[0]['data'])
	else:
		r = result[0]
		logging.info("{0}: count:{1} time: {2}".format(replica['name'], r['pingCount'], r['pingTimes']))

# pingTest
#    check responsiveness of each of the Perforce replica servers using p4 ping
def pingTest():
	logging.info("--- pingTest start")
	t = []
	for replica in g_config['replicas']:
		if replica['reachable']:
			rt = threading.Thread(name=replica['name'], target=p4Ping, args=(replica,))
			t.append(rt)
			rt.start()
	while threading.activeCount() > 1:
		logging.debug("waiting for active threads to complete...")
		time.sleep(1)
	logging.info("--- pingTest complete")

# p4Pull
#    runs a p4 ping on the specified replica
def p4Pull(replica):
	logging.debug("checking journal sequence on %s" % replica['name'])
	cmd = ["pull", "-lj"]
	result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True)
	if containsError(result):
		logging.info(result[0]['data'])
	else:
		r = result[0]
		if r['replicaJournalSequence'] == r['masterJournalSequence']:
			logging.info("{0}: in sync with master".format(replica['name']))
		else:
			diff = int(r['masterJournalSequence']) - int(r['replicaJournalSequence'])
			logging.info("{0}: sequence difference: {1}".format(replica['name'], diff))

# pullTest
#    performs a p4 pull on each of the Perforce replica servers
def pullTest():
	logging.info("--- pullTest start")
	t = []
	for replica in g_config['replicas']:
		if replica['reachable']:
			rt = threading.Thread(name=replica['name'], target=p4Pull, args=(replica,))
			t.append(rt)
			rt.start()
	while threading.activeCount() > 1:
		logging.debug("waiting for active threads to complete...")
		time.sleep(1)
	logging.info("--- pullTest complete")
	
# p4CheckKey
#    checks the key for the value
def p4CheckKey(replica, key, value, begin):
	logging.debug("checking for key on %s" % replica['name'])
	cmd = ["key", key]
	found = False
	for count in range(replica['attempts']):
		result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True)
		if containsError(result):
			logging.info(result[0]['data'])
			break
		else:
			r = result[0]
			if r['value'] == value:
				found = True
				done = time.time()
				diff = done - begin
				logging.info("%s: key updated %f sec" % (replica['name'], diff))
				break
	if not found:
		logging.error("%s: key not updated in %d attempts " % (replica['name'], replica['attempts']))

# metaTest
#    modify a piece of metadata and verify that it is sent to each of the replicas
def metaTest():
	logging.info("--- metaTest start")
	value = uuid.uuid4().hex
	cmd = ["key", g_config['metaTestKey'], value]
	result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
	begin = time.time()
	if result[0] is not None:
		logging.debug("[TIMESTAMP %f]: key set on master" % begin)
		for replica in g_config['replicas']:
			if replica['reachable']:
				rt = threading.Thread(name=replica['name'], target=p4CheckKey, args=(replica, g_config['metaTestKey'], value, begin,))
				rt.start()
		while threading.activeCount() > 1:
			logging.debug("waiting for active threads...")
			time.sleep(1)
	else:
		logging.error("unable to set key %s on master" % g_config['metaTestKey'])
	logging.info("--- metaTest complete")

def p4VerifyFile(replica, fstat, begin):
	logging.debug("verifying file on replica...")
	cmd = ["verify", "-s", "-m", "1", g_config['depotFile']]
	found = False
	for count in range(replica['attempts']):
		result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True)
		if containsError(result):
			logging.info(result[0]['data'])
			break
		else:
			r = result[0]
			if r['rev'] == fstat['headRev'] and "status" not in r:
				found = True
				done = time.time()
				diff = done - begin
				logging.info("%s: file updated %f sec" % (replica['name'], diff))
				break
			time.sleep(1)
	if not found:
		logging.error("%s: key not updated in %d attempts " % (replica['name'], replica['attempts']))

# fileTest
#    submit a file of a known size and verify that the file is sent to each of the replicas
def fileTest():
	logging.info("--- fileTest start")
	change = 0
	try:
		cmd = ["sync", "-k"]
		result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
		if result[1] is not None:
			raise Exception("error while synchronizing the client")
		cmd = ["where", g_config['depotFile']]
		result = p4MarshalCmd(cmd, g_config['admin.user'], g_config['master.port'])
		if containsError(result):
			raise Exception("error while retrieving client spec")
		if not 'path' in result[0]:
			raise Exception("unable to determine file path")
		file = result[0]['path']
		change = createChange("this is a test")
		adding = True
		if depotFileExists(g_config['depotFile']):
			adding = False
			cmd = ["edit", "-k", "-c", str(change), g_config['depotFile']]
			result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
			if (result[1] is not None) or ("opened for edit" not in result[0]):
				raise Exception("error while editing file")
		generateRandomFile(file, g_config['fileSize'])
		if adding:
			cmd = ["add", "-c", str(change), "-t", "binary+S", g_config['depotFile']]
			result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
			if result[1] is not None:
				raise Exception("error while adding file")
		cmd = ["submit", "-c", str(change)]
		result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
		if result[1] is not None:
			raise Exception("error while submitting changelist {0}".format(change))
		begin = time.time()
		cmd = ["fstat", g_config['depotFile']]
		result = p4MarshalCmd(cmd, g_config['admin.user'], g_config['master.port'])
		if containsError(result):
			raise Exception("Error getting fstat on file from master")
		fstat = result[0]
		for replica in g_config['replicas']:
			if replica['reachable']:
				rt = threading.Thread(name=replica['name'], target=p4VerifyFile, args=(replica, fstat, begin,))
				rt.start()
		while threading.activeCount() > 1:
			logging.debug("waiting for active threads...")
			time.sleep(1)
	except Exception as e:
		if change > 0:
			# revert the changelist
			cmd = ["revert", "-c", str(change), "//..."]
			p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
			# delete the pending changelist
			cmd = ["change", "-d", str(change)]
			p4Cmd(cmd, g_config['admin.user'], g_config['master.port'])
		raise e
	logging.info("--- fileTest complete")

def main(argv=None):
	global g_ticket

	verbose = False
	logFile = None
	configFile = None
	scantype = "all"
	
	try:
		opts, args = getopt.getopt(argv, "hl:c:vCt:")
	
		for opt, arg in opts:
			if opt == "-v":
				verbose = True
			elif opt == "-h":
				usage()
			elif opt == "-l":
				logFile = arg
			elif opt == "-c":
				configFile = arg
			elif opt == "-C":
				genConfigFile()
			elif opt == "-t":
				scantype = arg
				
		if(configFile == None):
			print("ERROR: configFile is required\n")
			usage()
			sys.exit(3)

		logLevel = logging.INFO
		if verbose:
			logLevel = logging.DEBUG
			
		if logFile is not None:
			logging.basicConfig(filename=logFile, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logLevel)
			logging.info("------ STARTING RUN ------")
		else:
			logging.basicConfig(format='[%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logLevel)

		parseConfigFile(configFile)
					
		if not tcpPing(g_config['master.port'], g_config['master.timeout']):
			raise Exception("Cannot ping master server")
		
		g_ticket = getTicket()
		if "invalid" in g_ticket:
			raise Exception("Password invalid")
		for replica in g_config['replicas']:
			# if we can't even get to the TCP/IP port, no bother trying to use p4 ping
			if tcpPing(replica['port'], replica['timeout']):
				replica['reachable'] = True
			else:
				replica['reachable'] = False
				# log the unreachable server as an error
				logging.error("%s: unreachable" % replica['name'])
		if scantype == "ping":
			pingTest()
		elif scantype == "pull":
			pullTest()
		elif scantype == "meta":
			metaTest()
		elif scantype == "file":
			fileTest()
		elif scantype == "all":
			pingTest()
			pullTest()
			metaTest()
			fileTest()
		else:
			raise Exception("Scan type not recognized")
			
		sys.exit(0)

	except getopt.GetoptError:
		print("ERROR: unknown argument\n")
		usage()
		sys.exit(2)

if __name__ == '__main__':
	try:
		main(sys.argv[1:])
	except Exception as e:
		errorExit(e)
# Change User Description Committed
#1 12970 alan_petersen A utility for testing replication