'''
Created on Dec 28, 2017
@author: Charlie McLouth
'''
import unittest
from P4Server import P4TestcaseShared
import os.path
from p4rest.util import osenviron, mkdtemp, rmtree
from p4rest.flask.application import create_app, APP_CONFIG_KEY
from p4rest.session import SESSION_HEADER_NAME
from werkzeug.datastructures import Headers
from base64 import b64encode
from flask import current_app, json
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
debugHandler = logging.StreamHandler()
debugHandler.setFormatter(logging.Formatter("%(levelname)s:%(filename)s:"
"%(lineno)d:%(funcName)s:"
"%(message)s"))
logger.addHandler(debugHandler)
class TestWWWCommands(P4TestcaseShared):
'''Test P4 environment within the P4RestSessionP4'''
def initTestData(self, _p4api):
_p4api.run_password("", "Perf0rce")
_p4api.run_configure("set", "security=3")
_p4api.run_login(password="Perf0rce")
name = "p4testadmin"
spec = _p4api.fetch_user(name)
_p4api.save_user(spec, "-f")
_p4api.run_password("", "Perf0rce", name)
_p4api.delete_depot("repo")
_p4api.delete_depot("depot")
def setUp(self):
self.orgP4CONFIG = osenviron.get("P4CONFIG", None)
self.orgP4TICKETS = osenviron.get("P4TICKETS", None)
self.orgP4TRUST = osenviron.get("P4TRUST", None)
self.testdir = os.path.abspath(".")
if not os.path.isdir(self.testdir):
raise Exception("Test directory ({}) does not exist."
"".format(self.testdir))
self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir)
self.configKey = "TestWWWCommandsConfig_key"
configdefaults = {self.configKey: os.path.join(self.testdir,
"test.json"),
"WORKDIR": os.path.join(self.testdir, "workdir"),
APP_CONFIG_KEY: self.configKey,
"DEBUG": True}
self.wwwapp = create_app(configdefaults)
self.wwwapp.testing = True
self.workdir = self.wwwapp.config["WORKDIR"]
self.p4tickets = self.wwwapp.config["P4TICKETS"]
P4TestcaseShared.setUp(self)
def tearDown(self):
if os.path.isdir(self.testdir): rmtree(self.testdir)
else: logger.warn('not a directory?:' + self.testdir)
if self.orgP4CONFIG is not None:
osenviron["P4CONFIG"] = self.orgP4CONFIG
elif "P4CONFIG" in osenviron:
del(osenviron["P4CONFIG"])
if self.orgP4TICKETS is not None:
osenviron["P4TICKETS"] = self.orgP4TICKETS
elif "P4TICKETS" in osenviron:
del(osenviron["P4TICKETS"])
if self.orgP4TRUST is not None:
osenviron["P4TRUST"] = self.orgP4TRUST
elif "P4CONFIG" in osenviron:
del(osenviron["P4CONFIG"])
def __genAuthHeader(self, user, password):
data = b""
if user is not None and len(user) > 0:
data = user.encode()
data = data + b":"
if password is not None and len(password) > 0:
data = data + password.encode()
return b"Basic " + b64encode(data)
def createSession(self, P4PORT, P4USER, P4PASSWORD):
command = "createSession"
with self.wwwapp.app_context():
with current_app.test_client() as client:
headers = Headers()
headers.add('Authorization', self.__genAuthHeader(P4USER, P4PASSWORD))
headers.add("Content-Type", "application/json")
data = {"P4PORT": P4PORT}
json_data = json.dumps(data)
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(200, response.status_code, command)
self.assertEqual("application/json", response.headers.get("Content-Type"), command)
response_data = json.loads(response.get_data())
self.assertIsInstance(response_data, dict, command)
self.assertEqual(P4USER, response_data["userName"], command)
sessionId = response_data.get(SESSION_HEADER_NAME, None)
self.assertIsNotNone(sessionId, command)
return sessionId
def deleteSession(self, sessionId):
requestPath = "/sessions/{}".format(sessionId)
self.wwwrequest("DELETE", sessionId, requestPath, None, None, 204)
def wwwrequest(self, method, sessionId, path, data, headers, httpstatuscode):
# clone headers
requestHeaders = Headers(headers)
# add the sessionId
requestHeaders.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId))
# convert data to json
json_data = None
if data is not None:
json_data = json.dumps(data)
# set the header that we have json data
requestHeaders.add("Content-Type", "application/json")
# issue the request
with self.wwwapp.app_context():
with current_app.test_client() as client:
if method.upper() == "POST":
response = client.post(path=path,
data=json_data,
headers=requestHeaders)
elif method.upper() == "GET":
response = client.get(path=path,
data=json_data,
headers=requestHeaders)
elif method.upper() == "PUT":
response = client.put(path=path,
data=json_data,
headers=requestHeaders)
elif method.upper() == "DELETE":
response = client.delete(path=path,
data=json_data,
headers=requestHeaders)
self.assertEqual(httpstatuscode, response.status_code, response.get_data())
return response
def assert_limit_and_next(self, testname, response, limit, offset,
currenttotal, expectedtotal):
'''assert the current values and advance to next offset'''
results = []
meta = None
warnings = None
errors = None
if isinstance(response, list):
results = response
logger.debug(results)
elif isinstance(response, (str, int)):
results = [response]
logger.debug(results)
elif isinstance(response, dict):
if "results" in response and response["results"] is not None:
if isinstance(response["results"], list):
results = response["results"]
else:
results = [response["results"]]
if "meta" in response and response["meta"] is not None:
meta = response["meta"]
if "warnings" in response and response["warnings"] is not None:
warnings = response["warnings"]
if "errors" in response and response["errors"] is not None:
errors = response["errors"]
if "results" not in response \
and "meta" not in response \
and "warnings" not in response \
and "errors" not in response:
results = [response]
logger.debug(testname)
#a if condition else b
logger.debug("len(warnings):{!s}".format(len(warnings) if warnings else None))
logger.debug("len(results):{!s}".format(len(results) if results else None))
logger.debug("len(meta):{!s}".format(len(meta) if meta else None))
logger.debug("len(errors):{!s}".format(len(errors) if errors else None))
logger.debug("limit):{!s}".format(limit))
logger.debug("offset):{!s}".format(offset))
logger.debug("currenttotal):{!s}".format(currenttotal))
logger.debug("expectedtotal):{!s}".format(expectedtotal))
self.assertGreaterEqual(limit, len(results), testname)
if len(results) < 1:
self.assertEqual(2, len(warnings), testname)
self.assertEqual(expectedtotal, warnings[-1]["count"], testname)
self.assertEqual(offset, warnings[-1]["offset"], testname)
newtotal = currenttotal + len(results)
logger.debug("newtotal):{!s}".format(newtotal))
if newtotal > expectedtotal:
logger.debug(results)
logger.debug(response)
self.assertGreaterEqual(expectedtotal, newtotal, testname)
if newtotal < expectedtotal:
self.assertEqual(limit, len(results), testname)
self.assertEqual(2, len(meta), testname)
self.assertEqual(limit, len(results), testname)
self.assertTrue(meta["more_results"], testname)
# advance offset
newoffset = meta["next_offset"]
self.assertGreaterEqual(expectedtotal, newoffset, testname)
elif len(results) > 0:
self.assertEqual(expectedtotal, newtotal, testname)
self.assertIsNone(warnings, testname)
# advance offset
newoffset = offset + len(results)
self.assertEqual(expectedtotal, newoffset, testname)
else:
# advance offset out of range
newoffset = offset + 1
self.assertLessEqual(expectedtotal, newoffset, testname)
logger.debug("newoffset):{!s}".format(newoffset))
return newtotal, newoffset
def assert_fetch_user(self, response, name):
command = "fetch_user"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("User", responseData, command)
self.assertEqual(name, responseData["User"], command)
# TODO: assert types
return responseData
def assert_save_user(self, response, name):
command = "save_user"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("User {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_depot(self, response, name, depotType):
command = "fetch_depot"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Depot", responseData, command)
self.assertEqual(name, responseData["Depot"], command)
self.assertEqual(depotType, responseData["Type"], command)
# TODO: assert types
return responseData
def assert_save_depot(self, response, name, depotType):
command = "save_depot"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Depot {} saved.".format(name), responseData[0], command)
self.assertEqual(depotType, responseData[1]["Type"], command)
# TODO: assert types
return responseData
def assert_fetch_group(self, response, name):
command = "fetch_group"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Group", responseData, command)
self.assertEqual(name, responseData["Group"], command)
# TODO: assert types
return responseData
def assert_save_group(self, response, name):
command = "save_group"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Group {} created.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_server(self, response, name):
command = "fetch_server"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("ServerID", responseData, command)
self.assertEqual(name, responseData["ServerID"], command)
# TODO: assert types
return responseData
def assert_save_server(self, response, name):
command = "save_server"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Server {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_remote(self, response, name):
command = "fetch_remote"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("RemoteID", responseData, command)
self.assertEqual(name, responseData["RemoteID"], command)
# TODO: assert types
return responseData
def assert_save_remote(self, response, name):
command = "save_remote"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Remote {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_ldap(self, response, name):
command = "fetch_ldap"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Name", responseData, command)
self.assertEqual(name, responseData["Name"], command)
# TODO: assert types
return responseData
def assert_save_ldap(self, response, name):
command = "save_ldap"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("LDAP configuration {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_job(self, response, name):
command = "fetch_job"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Job", responseData, command)
self.assertEqual(name, responseData["Job"], command)
# TODO: assert types
return responseData
def assert_save_job(self, response, name):
command = "save_job"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Job {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_branch(self, response, name):
command = "fetch_branch"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Branch", responseData, command)
self.assertEqual(name, responseData["Branch"], command)
# TODO: assert types
return responseData
def assert_save_branch(self, response, name):
command = "save_branch"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Branch {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_label(self, response, name):
command = "fetch_label"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Label", responseData, command)
self.assertEqual(name, responseData["Label"], command)
# TODO: assert types
return responseData
def assert_save_label(self, response, name):
command = "save_label"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Label {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_repo(self, response, name):
command = "fetch_repo"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Repo", responseData, command)
self.assertEqual(name, responseData["Repo"], command)
# TODO: assert types
return responseData
def assert_save_repo(self, response, name):
command = "save_repo"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Repo {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_stream(self, response, name):
command = "fetch_stream"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Stream", responseData, command)
self.assertEqual(name, responseData["Stream"], command)
# TODO: assert types
return responseData
def assert_save_stream(self, response, name):
command = "save_stream"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Stream {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_client(self, response, name):
command = "fetch_client"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Client", responseData, command)
self.assertEqual(name, responseData["Client"], command)
# TODO: assert types
return responseData
def assert_save_client(self, response, name):
command = "save_client"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
self.assertEqual("Client {} saved.".format(name), responseData[0], command)
# TODO: assert types
return responseData
def assert_fetch_change(self, response, changeno):
command = "fetch_change"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, dict, command)
self.assertIn("Change", responseData, command)
self.assertEqual(changeno, responseData["Change"], command)
# TODO: assert types
return responseData
def assert_save_change(self, response):
command = "save_change"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(2, len(responseData), command)
self.assertIsInstance(responseData[0], str, command)
self.assertIsInstance(responseData[1], dict, command)
parts = responseData[0].split()
self.assertEqual("Change", parts[0], command)
self.assertEqual("created.", parts[2], command)
self.assertEqual(parts[1], responseData[1]["Change"], command)
# TODO: assert types
return responseData
def assert_run_where(self, response, filecount=None, depotFileMap=None):
command = "run_where"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
if filecount is not None:
self.assertEqual(filecount, len(responseData), command)
self.assertIsInstance(responseData[0], dict, command)
self.assertEqual(3, len(responseData[0]), command)
if depotFileMap is not None:
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_add(self, response, changeno, depotFileMap):
command = "run_add"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(len(depotFileMap), len(responseData), command)
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_submit(self, response, changeno, depotFileMap):
command = "run_submit"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(len(depotFileMap) + 2, len(responseData), command)
self.assertEqual(changeno, responseData[0]["change"], command)
for revision in responseData[1:len(responseData)-2]:
self.assertIn(revision["depotFile"], depotFileMap, command)
self.assertIn("submittedChange", responseData[-1], command)
# TODO: assert types
return responseData
def assert_run_sync(self, response, filecount=None, depotFileMap=None):
command = "run_sync"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
if filecount is not None:
self.assertEqual(filecount, len(responseData), command)
if depotFileMap is not None:
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_edit(self, response, changeno, filecount=None, depotFileMap=None):
command = "run_edit"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
if filecount is not None:
self.assertEqual(filecount, len(responseData), command)
if depotFileMap is not None:
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_shelve(self, response, changeno, depotFileMap=None):
command = "run_shelve"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
if depotFileMap is not None:
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_revert(self, response, filecount=None, depotFileMap=None):
command = "run_revert"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
if filecount is not None:
self.assertEqual(filecount, len(responseData), command)
if depotFileMap is not None:
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_have(self, response, filecount=None, depotFileMap=None):
command = "run_have"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
# list or dict
if isinstance(responseData, dict):
self.assertIn("results", responseData, command)
if filecount is not None:
self.assertEqual(filecount, len(responseData["results"]), command)
if depotFileMap is not None:
for revision in responseData["results"]:
self.assertIn(revision["depotFile"], depotFileMap, command)
else:
self.assertIsInstance(responseData, list, command)
if filecount is not None:
self.assertEqual(filecount, len(responseData), command)
if depotFileMap is not None:
for revision in responseData:
self.assertIn(revision["depotFile"], depotFileMap, command)
# TODO: assert types
return responseData
def assert_run_info(self, response, **kargs):
command = "run_info"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(1, len(responseData), command)
self.assertIsInstance(responseData[0], dict, command)
for k,v in kargs.items():
self.assertIn(k, responseData[0], command)
self.assertEqual(v, responseData[0][k], command)
# TODO: assert types
return responseData
def assert_run_counter(self, response, name):
command = "run_counter"
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
# validate response
self.assertIsInstance(responseData, list, command)
self.assertEqual(1, len(responseData), command)
self.assertIn("counter", responseData[0], command)
self.assertEqual(name, responseData[0]["counter"], command)
# TODO: assert types
return responseData
def assert_iterate(self, response, name, itemcount=None):
command = "iterate_{}".format(name)
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
logger.debug(responseData)
# validate response
# list or dict
if isinstance(responseData, dict):
self.assertIn("results", responseData, command)
if itemcount is not None:
self.assertEqual(itemcount, len(responseData["results"]), command)
else:
self.assertIsInstance(responseData, list, command)
if itemcount is not None:
self.assertEqual(itemcount, len(responseData), command)
return responseData
def rest_increment_counter(self, counterName, sessionId):
command = "run_counter"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = ["-i", counterName]
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
counterValue = int(self.assert_run_counter(response, counterName)[0]["value"])
return counterValue
def p4_increment_counter(self, counterName, p4api):
counterValue = int(p4api.assert_run_counter("-i", counterName)[0]["value"])
return counterValue
def rest_create_users(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
counterName = "rest_create_depots"
if self.rest_increment_counter(counterName, superSessionId) < 2:
# create users
for i in range(0,10):
name = "user_{!s}".format(i)
command = "fetch_user"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", superSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_user(response, name)
# save the user
command = "save_user"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec, "-f"]
response = self.wwwrequest("POST", superSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_user(response, name)
# set the password
self.p4server.p4api.run_password("", "Perf0rce", name)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_depots(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon users
self.rest_create_users(superSessionId)
counterName = "create_depots"
if self.rest_increment_counter(counterName, superSessionId) < 2:
# create depots
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "depot_{!s}".format(i)
command = "fetch_depot"
requestHeaders = None
requestData = None
if i == 0:
depotType = "stream"
name = depotType
elif i == 1:
depotType = "graph"
name = depotType
elif i == 2:
depotType = "local"
name = depotType
elif i == 3:
depotType = "spec"
name = depotType
elif i == 4:
depotType = "archive"
name = depotType
elif i == 4:
depotType = "unload"
name = depotType
else:
depotType = "local"
requestPath = "/commands/{}?arg=-t&arg={}&arg={}".format(command, depotType, name)
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_depot(response, name, depotType)
# save the depot
command = "save_depot"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_depot(response, name, depotType)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_groups(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon users
self.rest_create_users(superSessionId)
counterName = "create_groups"
if self.rest_increment_counter(counterName, superSessionId) < 2:
# create groups
command = "iterate_users"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", superSessionId, requestPath, requestData, requestHeaders, 200)
responseData = json.loads(response.get_data())
self.assertIsInstance(responseData, list, command)
userList = []
for userSpec in responseData:
if userSpec["User"].startswith("user_"):
userList.append(userSpec["User"])
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "group_{!s}".format(i)
command = "fetch_group"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_group(response, name)
spec["Owners"] = ["user_{!s}".format(i)]
spec["Users"] = userList
# save the group
command = "save_group"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_save_group(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_servers(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon users
self.rest_create_users(superSessionId)
counterName = "create_servers"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "server_{!s}".format(i)
command = "fetch_server"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_server(response, name)
# save the server
command = "save_server"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_server(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_remotes(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon users
self.rest_create_users(superSessionId)
counterName = "create_remotes"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "remote_{!s}".format(i)
command = "fetch_remote"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_remote(response, name)
# save the remote
command = "save_remote"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_remote(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_ldaps(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon users
self.rest_create_users(superSessionId)
counterName = "create_ldaps"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "ldap_{!s}".format(i)
command = "fetch_ldap"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_ldap(response, name)
spec["SimplePattern"] = "%user%"
# save the LDAP
command = "save_ldap"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_ldap(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_jobs(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon users
self.rest_create_users(superSessionId)
counterName = "create_jobs"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "job_{!s}".format(i)
command = "fetch_job"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_job(response, name)
spec["Description"] = "description {!s}".format(i)
# save the job
command = "save_job"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_job(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_branches(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon depots and indirectly users
self.rest_create_depots(superSessionId)
counterName = "create_branches"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "branch_{!s}".format(i)
command = "fetch_branch"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_branch(response, name)
spec["View"].clear()
spec["View"].append("//local/a/... //local/b/...")
# save the branch
command = "save_branch"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_branch(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_labels(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon depots and indirectly users
self.rest_create_depots(superSessionId)
counterName = "create_labels"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "label_{!s}".format(i)
command = "fetch_label"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_label(response, name)
spec["View"].clear()
spec["View"].append("//local/...")
# save the label
command = "save_label"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_label(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_repos(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon depots and indirectly users
self.rest_create_depots(superSessionId)
counterName = "create_repos"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "//graph/repo_{!s}".format(i)
command = "fetch_repo"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_repo(response, name)
# save the repo
command = "save_repo"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
logger.debug(self.p4server.p4api.run_depots())
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_repo(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_streams(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon depots and indirectly users
self.rest_create_depots(superSessionId)
counterName = "create_streams"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "//stream/stream_{!s}".format(i)
command = "fetch_stream"
requestPath = "/commands/{}?arg=-t&arg=mainline&arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_stream(response, name)
# save the stream
command = "save_stream"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
logger.debug(self.p4server.p4api.run_depots())
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_stream(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_clients(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon depots
self.rest_create_depots(superSessionId)
counterName = "create_clients"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "client_{!s}".format(i)
command = "fetch_client"
requestPath = "/commands/{}?arg={}".format(command, name)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_client(response, name)
spec["Root"] = self.p4server.clientroot
options = spec["Options"].split()
options[-1] = "rmdir"
spec["Options"] = " ".join(options)
spec["View"].clear()
spec["View"].append("//local/... //{}/...".format(name))
# save the client
command = "save_client"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_client(response, name)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_pending_changes(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon clients
self.rest_create_clients(superSessionId)
counterName = "create_pending_changes"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "new"
client = "client_{!s}".format(i)
command = "fetch_change"
requestPath = "/commands/{}?arg={}&client={}".format(command, name, client)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_change(response, name)
spec["Description"] = "pending change {!s}".format(i)
# save the change
command = "save_change"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_change(response)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_submitted_changes(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon clients
self.rest_create_clients(superSessionId)
counterName = "create_submitted_changes"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "new"
client = "client_{!s}".format(i)
command = "fetch_change"
requestPath = "/commands/{}?arg={}&client={}".format(command, name, client)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_change(response, name)
spec["Description"] = "submitted change {!s}".format(i)
# save the change
command = "save_change"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_save_change(response)
changeno = responseData[1]["Change"]
fileList = []
whereMap = {}
for ii in range(0,10):
command = "run_where"
depotFile = "//local/{!s}/file{!s}.txt".format(i, ii)
requestPath = "/commands/{}?arg={}&client={}".format(command, depotFile, client)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_where(response, filecount=1, depotFileMap={depotFile: True})[0]
filename = responseData["path"]
fileList.append(filename)
whereMap[responseData["depotFile"]] = responseData
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, mode="a") as f:
f.write("a line {!s}".format(ii))
command = "run_add"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["-c", changeno] + fileList
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_add(response, changeno, whereMap)
command = "run_submit"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["-c", changeno]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_submit(response, changeno, whereMap)
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["//...#0"]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_sync(response, filecount=10, depotFileMap=whereMap)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def rest_create_shelved_changes(self, sessionId=None):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
if sessionId is None or len(sessionId) < 1:
superSessionId = self.createSession(p4port, p4user, "Perf0rce")
else:
superSessionId = sessionId
# depends upon submitted changes
self.rest_create_submitted_changes(superSessionId)
counterName = "create_shelved_changes"
if self.rest_increment_counter(counterName, superSessionId) < 2:
for i in range(0,10):
userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce")
name = "new"
client = "client_{!s}".format(i)
command = "fetch_change"
requestPath = "/commands/{}?arg={}&client={}".format(command, name, client)
requestHeaders = None
requestData = None
response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200)
spec = self.assert_fetch_change(response, name)
spec["Description"] = "shelved change {!s}".format(i)
# save the change
command = "save_change"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = [spec]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_save_change(response)
changeno = responseData[1]["Change"]
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["//local/{!s}/...".format(i)]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_sync(response, filecount=10)
depotFileMap = {}
for revision in responseData:
depotFileMap[revision["depotFile"]] = revision
command = "run_edit"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["-c", changeno, "//local/{!s}/...".format(i)]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_edit(response, changeno, filecount=10, depotFileMap=depotFileMap)
command = "run_shelve"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["-r", "-c", changeno]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_shelve(response, changeno, depotFileMap=depotFileMap)
command = "run_revert"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["//..."]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_revert(response, filecount=10, depotFileMap=depotFileMap)
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, client)
requestHeaders = None
requestData = ["//...#0"]
response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_sync(response, filecount=10, depotFileMap=depotFileMap)
self.deleteSession(userSessionId)
if sessionId is None or len(sessionId) < 1:
self.deleteSession(superSessionId)
def create_all(self):
self.rest_create_users()
self.rest_create_depots()
self.rest_create_groups()
self.rest_create_servers()
self.rest_create_remotes()
self.rest_create_ldaps()
self.rest_create_jobs()
self.rest_create_branches()
self.rest_create_labels()
self.rest_create_repos()
self.rest_create_streams()
self.rest_create_clients()
self.rest_create_pending_changes()
self.rest_create_submitted_changes()
self.rest_create_shelved_changes()
def test_basic(self):
self.rest_create_clients()
p4port = self.p4server.p4api.port
p4user = "user_0"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
# run_info
command = "run_info"
requestPath = "/commands/{}".format(command)
requestData = None
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_info(response, clientName="*unknown*")
# set a client
requestPath = "/commands/{}?client=client_0".format(command)
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_info(response, clientName="client_0")
# delete
self.deleteSession(sessionId)
def testLimits01(self):
# testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
expectedtotal = 100
self.rest_create_submitted_changes()
p4port = self.p4server.p4api.port
p4user = "user_0"
p4client = "client_0"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = ["-k", "//..."]
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_sync(response, filecount=expectedtotal)
depotFileMap = {}
for revision in responseData:
depotFileMap[revision["depotFile"]] = revision
# have with no limits
command = "run_have"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = None
requestHeaders = None
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_have(response, filecount=expectedtotal, depotFileMap=depotFileMap)
# cleanup
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = ["-k", "//...#0"]
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_sync(response, filecount=100, depotFileMap=depotFileMap)
# delete
self.deleteSession(sessionId)
def testLimits02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
expectedtotal = 100
self.rest_create_submitted_changes()
p4port = self.p4server.p4api.port
p4user = "user_0"
p4client = "client_0"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = ["-k", "//..."]
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_sync(response, filecount=expectedtotal)
depotFileMap = {}
for revision in responseData:
depotFileMap[revision["depotFile"]] = revision
# limit on the command
command = "run_have"
for limit in [1, 2, 3, 5, 7, 11]:
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset < 1:
requestPath = "/commands/{}?client={}&limit={!s}".format(command, p4client, limit)
else:
requestPath = "/commands/{}?client={}&limit={!s}&offset={!s}".format(command, p4client, limit, currentoffset)
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_have(response, depotFileMap=depotFileMap)
(currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
# cleanup
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = ["-k", "//...#0"]
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_sync(response, filecount=100, depotFileMap=depotFileMap)
# delete
self.deleteSession(sessionId)
def testLimits03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
expectedtotal = 100
self.rest_create_submitted_changes()
p4port = self.p4server.p4api.port
p4user = "user_0"
p4client = "client_0"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = ["-k", "//..."]
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_sync(response, filecount=expectedtotal)
depotFileMap = {}
for revision in responseData:
depotFileMap[revision["depotFile"]] = revision
# delete
self.deleteSession(sessionId)
# limit on the object
command = "run_have"
orgAppLimit = self.wwwapp.config["MAXRESULTS"]
for limit in [1, 2, 3, 5, 7, 11]:
self.wwwapp.config["MAXRESULTS"] = limit
sessionId = self.createSession(p4port, p4user, "Perf0rce")
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset < 1:
requestPath = "/commands/{}?client={}".format(command, p4client)
else:
requestPath = "/commands/{}?client={}&offset={!s}".format(command, p4client, currentoffset)
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_run_have(response, depotFileMap=depotFileMap)
(currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
# delete
self.deleteSession(sessionId)
# cleanup
self.wwwapp.config["MAXRESULTS"] = orgAppLimit
sessionId = self.createSession(p4port, p4user, "Perf0rce")
command = "run_sync"
requestPath = "/commands/{}?client={}".format(command, p4client)
requestData = ["-k", "//...#0"]
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_sync(response, filecount=100, depotFileMap=depotFileMap)
# delete
self.deleteSession(sessionId)
def run_iterator01(self, testname, plural, expectedtotal):
p4port = self.p4server.p4api.port
p4user = "user_0"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
# first pass with no limits
requestData = None
requestHeaders = None
command = "iterate_{}".format(plural)
requestPath = "/commands/{}".format(command)
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_iterate(response, plural, itemcount=expectedtotal)
# delete
self.deleteSession(sessionId)
def run_iterator02(self, testname, plural, expectedtotal):
p4port = self.p4server.p4api.port
p4user = "user_0"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
# first pass with no limits
requestData = None
requestHeaders = None
command = "iterate_{}".format(plural)
for limit in [1, 2, 3, 5, 7, 11]:
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset < 1:
requestPath = "/commands/{}?limit={!s}".format(command, limit)
else:
requestPath = "/commands/{}?limit={!s}&offset={!s}".format(command, limit, currentoffset)
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_iterate(response, plural)
(currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
# delete
self.deleteSession(sessionId)
def run_iterator03(self, testname, plural, expectedtotal):
p4port = self.p4server.p4api.port
p4user = "user_0"
# first pass with no limits
requestData = None
requestHeaders = None
command = "iterate_{}".format(plural)
orgAppLimit = self.wwwapp.config["MAXRESULTS"]
for limit in [1, 2, 3, 5, 7, 11]:
self.wwwapp.config["MAXRESULTS"] = limit
sessionId = self.createSession(p4port, p4user, "Perf0rce")
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset < 1:
requestPath = "/commands/{}".format(command)
else:
requestPath = "/commands/{}?offset={!s}".format(command, currentoffset)
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_iterate(response, plural)
(currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
# delete
self.deleteSession(sessionId)
# cleanup
self.wwwapp.config["MAXRESULTS"] = orgAppLimit
def testIteratorbranches01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_branches()
plural = "branches"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorbranches02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_branches()
plural = "branches"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorbranches03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_branches()
plural = "branches"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorrepos01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_repos()
plural = "repos"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorrepos02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_repos()
plural = "repos"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorrepos03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_repos()
plural = "repos"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorchanges01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_pending_changes()
self.rest_create_submitted_changes()
self.rest_create_shelved_changes()
plural = "changes"
expectedtotal = 30
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorchanges02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_pending_changes()
self.rest_create_submitted_changes()
self.rest_create_shelved_changes()
plural = "changes"
expectedtotal = 30
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorchanges03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_pending_changes()
self.rest_create_submitted_changes()
self.rest_create_shelved_changes()
plural = "changes"
expectedtotal = 30
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorclients01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_clients()
plural = "clients"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorclients02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_clients()
plural = "clients"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorclients03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_clients()
plural = "clients"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratordepots01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_depots()
plural = "depots"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratordepots02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_depots()
plural = "depots"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratordepots03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_depots()
plural = "depots"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorgroups01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_groups()
plural = "groups"
expectedtotal = 100
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorgroups02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_groups()
plural = "groups"
expectedtotal = 100
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorgroups03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_groups()
plural = "groups"
expectedtotal = 100
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorjobs01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_jobs()
plural = "jobs"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorjobs02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_jobs()
plural = "jobs"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorjobs03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_jobs()
plural = "jobs"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorlabels01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_labels()
plural = "labels"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorlabels02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_labels()
plural = "labels"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorlabels03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_labels()
plural = "labels"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorldaps01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_ldaps()
plural = "ldaps"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorldaps02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_ldaps()
plural = "ldaps"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorldaps03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_ldaps()
plural = "ldaps"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorremotes01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_remotes()
plural = "remotes"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorremotes02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_remotes()
plural = "remotes"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorremotes03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_remotes()
plural = "remotes"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorservers01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_servers()
plural = "servers"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorservers02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_servers()
plural = "servers"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorservers03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_servers()
plural = "servers"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorstreams01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_streams()
plural = "streams"
expectedtotal = 10
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorstreams02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_streams()
plural = "streams"
expectedtotal = 10
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorstreams03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_streams()
plural = "streams"
expectedtotal = 10
self.run_iterator03(testname, plural, expectedtotal)
def testIteratorusers01(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_users()
plural = "users"
expectedtotal = 12
self.run_iterator01(testname, plural, expectedtotal)
def testIteratorusers02(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_users()
plural = "users"
expectedtotal = 12
self.run_iterator02(testname, plural, expectedtotal)
def testIteratorusers03(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_users()
plural = "users"
expectedtotal = 12
self.run_iterator03(testname, plural, expectedtotal)
def test_update(self):
testname = self.id().rsplit(sep=".", maxsplit=1)[-1]
self.rest_create_labels()
p4port = self.p4server.p4api.port
p4user = "p4testadmin"
sessionId = self.createSession(p4port, p4user, "Perf0rce")
# run_info
command = "run_info"
requestPath = "/commands/{}".format(command)
requestData = None
requestHeaders = None
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_run_info(response, clientName="*unknown*")
command = "iterate_labels"
requestPath = "/commands/{}".format(command)
response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200)
responseData = self.assert_iterate(response, "labels", itemcount=10)
for label in responseData:
name = label["Label"]
label["Description"] = label["Description"] + " updated by {}".format(p4user)
command = "update_label"
requestPath = "/commands/{}".format(command)
requestData = [name, label]
requestHeaders = None
response = self.wwwrequest("PUT", sessionId, requestPath, requestData, requestHeaders, 200)
self.assert_save_label(response, name)
command = "delete_label"
requestPath = "/commands/{}?arg={}".format(command, name)
requestData = None
requestHeaders = None
response = self.wwwrequest("DELETE", sessionId, requestPath, requestData, requestHeaders, 200)
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
logger.debug(responseData)
counterName = "create_labels"
command = "run_counter"
requestPath = "/commands/{}".format(command)
requestHeaders = None
requestData = ["-d", counterName]
response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200)
self.assertEqual("application/json", response.headers.get("Content-Type"))
responseData = json.loads(response.get_data())
logger.debug(responseData)
# delete
self.deleteSession(sessionId)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()