# # Python module for common Perforce commands. # Right now, this uses just the command-line, but could be replaced with # p4api. This will be updated with commands as needed. # import os, sys, Setup, cutil, types, marshal UNITTEST_MOCK = None def createP4(): if UNITTEST_MOCK: return UNITTEST_MOCK elif Setup.P4API: return P4Commands_P4API() else: return P4Commands_Cmd() class P4Commands: ''' Interface for types of implementations. ''' def __init__(self): self.env = Setup.SERVER.P4ENV def retrieve(self, depotPath, revision = '#head'): ''' Retrieve the contents of a specific version of a depot-controlled file. Returns None if the depot file doesn't exist ''' cutil.log(cutil.ERR, 'retrieve not implemented') return None def updateGroup(self, groupSpec): ''' Updates a group from a group spec. groupSpec: plain text similar in format to the output of 'p4 group -o' returns True if the operation worked, or False if there was an error ''' cutil.log(cutil.ERR, 'updateGroup not implemented') return False def getCounter(self, counterName): ''' Retrieves the value for a specific named counter. Returns None if the operation didn't work, or if there is no such counter. ''' counters = self.getCounters() if counters is None: return None for name,value in counters.items(): if name == counterName: return value return None def getCounters(self): ''' Retrieves a dictionary of counter names to (integer) values. Returns None if the operation didn't work. ''' cutil.log(cutil.ERR, 'getCounters not implemented') return None class P4Commands_Cmd(P4Commands): ''' Command-line based implementation of P4Commands. ''' def __init__(self): P4Commands.__init__(self) c = Setup.SERVER.P4CMD if type(c) in types.StringTypes: self.p4cmd = [c] elif type(c) == types.ListType or type(c) == types.TupleType: self.p4cmd = [] self.p4cmd.extend(c) def retrieve(self, depotPath, revision = '#head'): ''' Retrieve the contents of a specific version of a depot-controlled file. Returns None if the depot file doesn't exist ''' if revision is None: revision = '' # retrieve the top line too, to detect if the requested file exists args = ('print', depotPath + revision) out = self.__p4(args) if out is None: return text = '' for x in out: if x.has_key('code') and x['code'] == 'text' and x.has_key('data'): text = "%s%s" % (text, x['data']) cutil.log(cutil.DEBUG, ("p4util: Retrieved file ", depotPath, revision)) return text def updateGroup(self, groupSpec): ''' Updates a group from a group spec. groupSpec: plain text similar in format to the output of 'p4 group -o' ''' # Because of the format of the spec file, we can't easily turn it into # a -G equivalent. cmd = [] cmd.extend(self.p4cmd) cmd.extend(('-s', 'group', '-i')) process = self.__popen(cmd, 'rwb') process.stdin.write(groupSpec) process.stdin.close() ret = True lastLine = None for line in process.stdout.readlines(): lastLine = line if line.startswith('error: '): cutil.log(cutil.ERR, "p4util: Error running [%s]: %s" % ( ' '.join(cmd), line)) ret = False if lastLine.strip() != 'exit: 0': cutil.log(cutil.ERR, "p4util: Error running [%s]: %s" % ( ' '.join(cmd), line)) ret = False process.wait() cutil.log(cutil.INFO, "p4util: Updated group meta-data") return ret def getCounters(self): ''' Retrieves a dictionary of counter names to (integer) values. Returns None if the operation didn't work. ''' rL = self.__p4(['counters']) if rL is None: return None dout = {} for d in rL: if d and d.has_key('counter') \ and d.has_key('value'): dout[d['counter']] = int(d['value']) return dout def __mkEnv(self): out = os.environ.copy() out.update(self.env) return out def __p4(self, args, inp = None): """ Execute the Perforce command with the given arguments, and return an array of dictionaries. """ cmd = [] cmd.extend(self.p4cmd) cmd.append('-G') cmd.extend(args) cutil.log(cutil.VERBOSE, ["Running command ", cmd]) ret = [] sout = None try: process = self.__popen(cmd) if inp is not None: process.stdin.write(inp) err = False process.stdin.close() while 1: d = marshal.load(process.stdout) if d.has_key('code') and d['code'] == 'error': err = True cutil.log(cutil.ERR, ["P4 reported error in command [", cmd, "]: ", d.has_key('data') and d['data'] or '']) ret.append(d) except EOFError: # marked as end of input # This is the correct end-of-read section pass except: err = True cutil.log_error() cutil.log(cutil.ERR, ["Failed to execute command ", cmd]) process.wait() if err: return None else: return ret def __popen(self, cmd, mode = 'rw'): ''' An abstraction of the os.popen4 command, so that unit testing is possible by overridding this method. It also allows for easier upgrade when a replacement for the standard operation is made, to allow for passing env values. ''' # Python 2.4+ #import subprocess #process = subprocess.Popen(cmd, shell=True, env=self.__mkEnv(), # stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #return process class Proc: def __init__(self, cmd, mode, env): (sin, sout) = os.popen4(cmd, mode) self.stdout = sout self.stdin = sin def wait(self): try: self.stdin.close() except: cutil.log_error(cutil.WARN) try: self.stdout.readlines() except: cutil.log_error(cutil.WARN) try: self.stdout.close() except: cutil.log_error(cutil.WARN) return 0 def __del__(self): self.wait() env = None # Once usable: # env = self.__mkEnv() return Proc(cmd, mode, env) class P4Commands_P4API(P4Commands): ''' P4Python (API) based implementation of P4Commands. Not implemented yet ''' def __init__(self): P4Commands.__init__(self) cutil.log(cutil.ERR, 'P4Commands_P4API not implemented')