'''
Created on Dec 2, 2017
@author: Charlie McLouth
'''
import unittest
from p4rest.p4 import P4
from P4Server import P4TestcaseShared
import os
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
debugHandler = logging.StreamHandler()
debugHandler.setFormatter(logging.Formatter("%(levelname)s:%(filename)s:"
"%(lineno)d:%(funcName)s:"
"%(message)s"))
logger.addHandler(debugHandler)
class TestP4(P4TestcaseShared):
def initTestData(self, _p4api):
# create 10 of every object
p4api = P4()
p4api.port = _p4api.port
p4api.user = _p4api.user
p4api.client = _p4api.client
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
for plural in ["users", "depots", "streams", "clients", "repos",
"labels", "branches", "changes", "jobs", "groups",
"ldaps", "remotes", "servers"]:
single, key = p4api.specfields.get(plural)
for i in range(0,10):
args = ["{}_{!s}".format(single, i)]
p4api.user = "user_{!s}".format(i)
if single == "depot":
if i == 0:
args = ["-t" "stream", "{}_{!s}".format(single, i)]
elif i == 1:
args = ["-t" "graph", "{}_{!s}".format(single, i)]
elif single == "change":
args = []
p4api.client = "client_{!s}".format(i)
elif single == "user":
args.clear()
elif single == "stream":
args = ["-t" "mainline", "//depot_0/{}{}".format(single, i)]
elif single == "repo":
args = ["//depot_1/{}{}".format(single, i)]
func = getattr(p4api, "fetch_{}".format(single))
spec = func(args)
args = [spec]
if single == "branch":
spec["View"][0] = "//depot_2/a/... //depot_2/b/..."
elif single == "client":
spec["Root"] = self.p4server.clientroot
options = spec["Options"].split()
options[-1] = "rmdir"
spec["Options"] = " ".join(options)
elif single in ["change", "job"]:
spec["Description"] = "description {!s}".format(i)
elif single == "ldap":
spec["SimplePattern"] = "%user%"
elif single == "group":
spec["Owners"] = [p4api.user]
spec["Users"] = [p4api.user]
func = getattr(p4api, "save_{}".format(single))
spec = func(args)
logger.debug("{}:({},{})".format(plural, single, key))
p4api.user = _p4api.user
p4api.client = _p4api.client
for i in range(0,10):
p4api.user = "user_{!s}".format(i)
p4api.client = "client_{!s}".format(i)
spec = p4api.fetch_change()
spec["Description"] = "submitted change"
changeno = p4api.save_change(spec)[0].split(maxsplit=2)[1]
for ii in range(0,10):
filename = p4api.run_where("//depot_2/{!s}/file{!s}.txt".format(i, ii))[0]["path"]
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, mode="a") as f:
f.write("a line {!s}".format(ii))
p4api.run_add("-c", changeno, filename)
for line in p4api.run_submit("-c", changeno):
if isinstance(line, dict) and "submittedChange" in line:
changeno = line["submittedChange"]
break
spec = p4api.fetch_change()
spec["Description"] = "shelved change"
changeno = p4api.save_change(spec)[0].split(maxsplit=2)[1]
p4api.run_edit("-c", changeno, "//depot_2/{!s}/...".format(i))
p4api.run_shelve("-r", "-c", changeno)
spec = p4api.fetch_change()
spec["Description"] = "pending change"
changeno = p4api.save_change(spec)[0].split(maxsplit=2)[1]
# p4api.run_reopen("-c", changeno, "//depot_2/{!s}/...".format(i))
p4api.run_revert("//...")
p4api.disconnect()
def assertLimitAndNext(self, testname, warnings, results, limit, offset,
currenttotal, expectedtotal):
'''assert the current values and advance to next offset'''
if not isinstance(results, list):
results = [results]
logger.debug(testname)
logger.debug("len(warnings):{!s}".format(len(warnings)))
logger.debug("len(results):{!s}".format(len(results)))
logger.debug("limit):{!s}".format(limit))
logger.debug("offset):{!s}".format(offset))
logger.debug("currenttotal):{!s}".format(currenttotal))
logger.debug("expectedtotal):{!s}".format(expectedtotal))
self.assertGreaterEqual(limit, len(results), testname)
if len(results) < 1:
self.assertEqual(2, len(warnings), testname)
self.assertEqual(expectedtotal, warnings[-1]["count"], testname)
self.assertEqual(offset, warnings[-1]["offset"], testname)
newtotal = currenttotal + len(results)
logger.debug("newtotal):{!s}".format(newtotal))
self.assertGreaterEqual(expectedtotal, newtotal, testname)
if newtotal < expectedtotal:
self.assertEqual(limit, len(results), testname)
self.assertEqual(2, len(warnings), testname)
self.assertEqual(limit, len(results), testname)
# advance offset
newoffset = warnings[-1]["next_offset"]
self.assertGreaterEqual(expectedtotal, newoffset, testname)
elif len(results) > 0:
self.assertEqual(expectedtotal, newtotal, testname)
self.assertEqual(0, len(warnings), testname)
# advance offset
newoffset = offset + len(results)
self.assertEqual(expectedtotal, newoffset, testname)
else:
# advance offset out of range
newoffset = offset + 1
self.assertLessEqual(expectedtotal, newoffset, testname)
logger.debug("newoffset):{!s}".format(newoffset))
return newtotal, newoffset
def testassert(self):
testname = "testassert"
data = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
# fits in limit
results = list(data)
limit = len(data)
warnings = []
currentoffset = 0
currenttotal = 0
expectedtotal = len(data)
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(expectedtotal, currenttotal, testname)
self.assertEqual(expectedtotal, currentoffset, testname)
warnings = ["", {"count": expectedtotal, "offset": currentoffset}]
results = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertGreater(currentoffset, expectedtotal, testname)
# even across limits
results = data[0:5]
limit = 5
warnings = ["", {"next_offset": limit}]
currentoffset = 0
currenttotal = 0
expectedtotal = len(data)
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(limit, currenttotal, testname)
self.assertEqual(limit, currentoffset, testname)
results = data[5:]
warnings = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(expectedtotal, currenttotal, testname)
self.assertEqual(expectedtotal, currentoffset, testname)
warnings = ["", {"count": expectedtotal, "offset": currentoffset}]
results = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertGreater(currentoffset, expectedtotal, testname)
# odd across limits
results = data[0:4]
limit = 4
warnings = ["", {"next_offset": limit}]
currentoffset = 0
currenttotal = 0
expectedtotal = len(data)
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(limit, currenttotal, testname)
self.assertEqual(limit, currentoffset, testname)
results = data[4:8]
warnings = ["", {"next_offset": 8}]
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(8, currenttotal, testname)
self.assertEqual(8, currentoffset, testname)
results = data[8:]
warnings = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(expectedtotal, currenttotal, testname)
self.assertEqual(expectedtotal, currentoffset, testname)
warnings = ["", {"count": expectedtotal, "offset": currentoffset}]
results = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertGreater(currentoffset, expectedtotal, testname)
# odd across limits
results = data[0:3]
limit = 3
warnings = ["", {"next_offset": limit}]
currentoffset = 0
currenttotal = 0
expectedtotal = len(data)
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(limit, currenttotal, testname)
self.assertEqual(limit, currentoffset, testname)
results = data[3:6]
warnings = ["", {"next_offset": 6}]
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(6, currenttotal, testname)
self.assertEqual(6, currentoffset, testname)
results = data[6:9]
warnings = ["", {"next_offset": 9}]
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(9, currenttotal, testname)
self.assertEqual(9, currentoffset, testname)
results = data[9:]
warnings = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertEqual(expectedtotal, currenttotal, testname)
self.assertEqual(expectedtotal, currentoffset, testname)
warnings = ["", {"count": expectedtotal, "offset": currentoffset}]
results = []
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
self.assertGreater(currentoffset, expectedtotal, testname)
# all possibilities
expectedtotal = len(data)
for limit in range(1, expectedtotal + 1):
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset == expectedtotal:
results = []
warnings = ["", {"count": expectedtotal, "offset": currentoffset}]
elif currentoffset + limit >= expectedtotal:
results = data[currentoffset:]
warnings = []
else:
results = data[currentoffset:currentoffset + limit]
warnings = ["", {"next_offset": currentoffset + limit}]
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
def testLimits(self):
testname = "testLimits"
logger.debug(testname)
# first pass with no limits
p4api = P4()
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
p4api.run_sync("//...")
results = p4api.run_have()
expectedtotal = len(results)
self.assertEqual(100, expectedtotal)
self.assertEqual(0, len(p4api.warnings))
# limit on the command
for limit in [1, 2, 3, 5, 7, 11]:
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset < 1:
results = p4api.run_have(limit=limit)
else:
results = p4api.run_have(limit=limit, offset=currentoffset)
warnings = p4api.warnings
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
# confirm we ignore limits on sync
p4files = p4api.run_sync("//...#0", limit=limit)
self.assertGreater(len(p4files), limit)
p4files = p4api.run_sync("//...", limit=limit)
self.assertGreater(len(p4files), limit)
p4api.disconnect()
del(p4api)
# limit on the object
for limit in [1, 2, 3, 5, 7, 11]:
p4api = P4(limit=limit)
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
p4api.run_sync("//...")
currentoffset = 0
currenttotal = 0
expectedCounter = expectedtotal // limit + 1
if expectedtotal % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedtotal:
if currentoffset < 1:
results = p4api.run_have()
else:
results = p4api.run_have(offset=currentoffset)
warnings = p4api.warnings
(currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal)
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
# confirm we ignore limits on sync
p4files = p4api.run_sync("//...#0")
self.assertGreater(len(p4files), limit)
p4files = p4api.run_sync("//...")
self.assertGreater(len(p4files), limit)
p4api.disconnect()
del(p4api)
# cleanup
p4api = P4()
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
p4api.run_sync("//...#0")
p4api.disconnect()
del(p4api)
def testIterator(self):
testname = "testIterator"
logger.debug(testname)
# first pass with no limits
p4api = P4()
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
# first pass with no limits
expectedMap = {"branches": 10, "changes": 40, "clients": 10,
"depots": 12, "groups": 10, "jobs": 10, "labels": 10,
"ldaps": 10, "remotes": 10, "repos": 10, "servers": 10,
"streams": 10, "users": 11 }
plurals = list(p4api.specfields.keys())
plurals.sort()
for plural in plurals:
func = getattr(p4api, "run_{}".format(plural))
p4results = func()
actualcount = len(p4results)
self.assertEqual(expectedMap[plural], actualcount, plural)
self.assertEqual(0, len(p4api.warnings), plural)
# limit on the command
for limit in [1, 2, 3, 5, 7, 11]:
for plural in plurals:
func = getattr(p4api, "iterate_{}".format(plural))
testinfo = "{}-{}".format(testname, plural)
currentoffset = 0
currenttotal = 0
expectedCounter = expectedMap[plural] // limit + 1
if expectedMap[plural] % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedMap[plural]:
results = []
if currentoffset < 1:
for spec in func(limit=limit):
results.append(spec)
else:
for spec in func(limit=limit, offset=currentoffset):
results.append(spec)
warnings = p4api.warnings
(currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural])
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
p4api.disconnect()
del(p4api)
# limit on the object
for limit in [1, 2, 3, 5, 7, 11]:
p4api = P4(limit=limit)
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
for plural in plurals:
testinfo = "{}-{}".format(testname, plural)
func = getattr(p4api, "iterate_{}".format(plural))
currentoffset = 0
currenttotal = 0
expectedCounter = expectedMap[plural] // limit + 1
if expectedMap[plural] % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedMap[plural]:
results = []
if currentoffset < 1:
for spec in func():
results.append(spec)
else:
for spec in func(offset=currentoffset):
results.append(spec)
warnings = p4api.warnings
(currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural])
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
p4api.disconnect()
del(p4api)
def testMLimits(self):
testname = "testMLimits"
logger.debug(testname)
# first pass with no limits
p4api = P4()
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
# first pass with no limits
expectedMap = {"branches": 10, "changes": 40, "clients": 10,
"depots": 12, "groups": 10, "jobs": 10, "labels": 10,
"ldaps": 10, "remotes": 10, "repos": 10, "servers": 10,
"streams": 10, "users": 11 }
plurals = list(p4api.specfields.keys())
plurals.sort()
for plural in plurals:
func = getattr(p4api, "run_{}".format(plural))
p4results = func()
actualcount = len(p4results)
self.assertEqual(expectedMap[plural], actualcount, plural)
self.assertEqual(0, len(p4api.warnings), plural)
# limit on the command
for limit in [1, 2, 3, 5, 7, 11]:
for plural in plurals:
func = getattr(p4api, "run_{}".format(plural))
testinfo = "{}-{}".format(testname, plural)
currentoffset = 0
currenttotal = 0
expectedCounter = expectedMap[plural] // limit + 1
if expectedMap[plural] % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedMap[plural]:
results = []
if currentoffset < 1:
results = func(limit=limit)
else:
results = func(limit=limit, offset=currentoffset)
warnings = p4api.warnings
(currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural])
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
p4api.disconnect()
del(p4api)
# limit on the object
for limit in [1, 2, 3, 5, 7, 11]:
p4api = P4(limit=limit)
p4api.port = self.p4server.p4api.port
p4api.user = "user_0"
p4api.client = "client_0"
p4api.exception_level = P4.RAISE_ERROR
p4api.connect()
for plural in plurals:
testinfo = "{}-{}".format(testname, plural)
func = getattr(p4api, "run_{}".format(plural))
currentoffset = 0
currenttotal = 0
expectedCounter = expectedMap[plural] // limit + 1
if expectedMap[plural] % limit > 0:
expectedCounter = expectedCounter + 1
self.assertGreaterEqual(expectedCounter, 2, testname)
actualCounter = 0
while currentoffset <= expectedMap[plural]:
results = []
if currentoffset < 1:
results = func()
else:
results = func(offset=currentoffset)
warnings = p4api.warnings
(currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural])
actualCounter = actualCounter + 1
self.assertEqual(expectedCounter, actualCounter, testname)
p4api.disconnect()
del(p4api)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()