'''
Created on Nov 20, 2017
@author: Charlie McLouth
'''
import unittest
import os.path
from p4rest.util import osenviron, mkdtemp, rmtree, osremovefile, oslistdir
from p4rest.session import P4RestSession, P4RestSessionAdmin, P4RestSessionP4, \
SESSION_HEADER_NAME
from p4rest.exceptions import P4RestSessionError
from p4rest.flask.config import P4RestAppConfig
from P4Server import P4Testcase, testConfiguration
from P4 import P4Exception
from datetime import datetime
from p4rest.flask.application import create_app, APP_CONFIG_KEY
from werkzeug.datastructures import Headers
from base64 import b64encode
from flask import current_app, json
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
debugHandler = logging.StreamHandler()
debugHandler.setFormatter(logging.Formatter("%(levelname)s:%(filename)s:"
"%(lineno)d:%(funcName)s:"
"%(message)s"))
logger.addHandler(debugHandler)
class TestP4RestSession(unittest.TestCase):
'''Testing P4RestSession and P4RestSessionP4'''
def setUp(self):
self.orgEnviron = {}
for k,v in osenviron.items():
if k.startswith("P4"):
self.orgEnviron[k] = v
self.testdir = os.path.abspath(".")
if not os.path.isdir(self.testdir):
raise Exception("Test directory ({}) does not exist.".format(self.testdir))
self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir)
def tearDown(self):
if os.path.isdir(self.testdir): rmtree(self.testdir)
else: logger.warn('not a directory?:' + self.testdir)
osenviron.update(self.orgEnviron)
todelete = []
for k in osenviron.keys():
if k.startswith("P4"):
if k not in self.orgEnviron:
todelete.append(k)
for k in todelete:
del(osenviron[k])
def testdump(self):
session = P4RestSession()
self.assertEqual(0, len(session))
with self.assertRaises(P4RestSessionError) as cm:
session.load()
expected = "undefined session"
actual = cm.exception.args[0]
self.assertEqual(expected, actual)
filename = os.path.join(self.testdir, ".conf")
with self.assertRaises(P4RestSessionError) as cm:
session.load(filename)
expected = "error reading from file ({})".format(filename)
actual = cm.exception.args[0]
self.assertEqual(expected, actual)
session["sessionfile"] = filename
with self.assertRaises(P4RestSessionError) as cm:
session.load()
expected = "error reading from file ({})".format(filename)
actual = cm.exception.args[0]
self.assertEqual(expected, actual)
session.dump(filename)
session.clear()
session.load(filename, True)
self.assertEqual(2, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSession", session["SESSIONTYPE"])
testdict = {}
testdict.update(session)
session.validate(testdict)
session.clear()
session["sessionfile"] = filename
session.load(None, True)
self.assertEqual(2, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSession", session["SESSIONTYPE"])
session["A"] = "A"
session["#A"] = "A"
session["a"] = "a"
session["#a"] = "a"
session.dump()
session.load()
self.assertEqual(6, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSession", session["SESSIONTYPE"])
session.clear()
filename2 = os.path.join(mkdtemp(suffix=None, prefix=__name__, dir=self.testdir), ".conf")
session.dump(filename2)
session.load(filename2)
self.assertEqual(2, len(session))
self.assertEqual(filename2, session["sessionfile"])
self.assertEqual("P4RestSession", session["SESSIONTYPE"])
def testdumpP4(self):
session = P4RestSession()
self.assertEqual(0, len(session))
filename = os.path.join(self.testdir, ".session")
session.dump(filename)
testdict = {}
testdict.update(session)
session = P4RestSessionP4()
with self.assertRaises(P4RestSessionError) as cm:
session.validate(testdict)
expected = "undefined session"
actual = cm.exception.args[0]
self.assertEqual(expected, actual)
testdict["sessionfile"] = filename
for name in ["P4PORT", "P4USER", "P4CLIENT", "P4TICKETS", "P4TRUST"]:
for x in range(0,5):
if x == 0:
if name in testdict:
testdict.pop(name)
elif x == 1:
testdict[name] = None
elif x == 2:
testdict[name] = 35
elif x == 3:
testdict[name] = ""
elif x == 4:
if name not in ["P4TICKETS", "P4TRUST"]:
continue
testdict[name] = os.path.join(self.testdir, "baddir", ".file")
with self.assertRaises(P4RestSessionError) as cm:
session.validate(testdict)
expected = "Invalid {}".format(name)
actual = cm.exception.args[0]
self.assertEqual(expected, actual)
if name == "P4PORT":
testdict[name] = "something:something:something"
elif name == "P4USER":
testdict[name] = "someuser"
elif name == "P4CLIENT":
testdict[name] = "someclient"
elif name == "P4TICKETS":
testdict[name] = os.path.join(self.testdir, ".file")
elif name == "P4TRUST":
testdict[name] = os.path.join(self.testdir, ".file")
session.validate(testdict)
session.update(testdict)
session.validate()
session.dump()
session.clear()
session.load(filename, True)
self.assertEqual(7, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"])
session.clear()
session["sessionfile"] = filename
session.load(None, True)
self.assertEqual(7, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"])
session["A"] = "A"
session["#A"] = "A"
session["a"] = "a"
session["#a"] = "a"
session.dump()
session.load()
self.assertEqual(11, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"])
filename2 = os.path.join(mkdtemp(suffix=None, prefix=__name__, dir=self.testdir), ".session")
session.dump(filename2)
session.load(filename2, True)
self.assertEqual(11, len(session))
self.assertEqual(filename2, session["sessionfile"])
self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"])
def teststatics(self):
filename = os.path.join(self.testdir, ".conf")
with self.assertRaises(P4RestSessionError) as cm:
P4RestSession.createFromFile(filename)
expected = "error reading from file ({})".format(filename)
actual = cm.exception.args[0]
self.assertEqual(expected, actual)
with open(filename, 'w'):
pass
#create from empty file
session = P4RestSession.createFromFile(filename)
self.assertEqual(2, len(session))
self.assertEqual(filename, session["sessionfile"])
self.assertEqual("P4RestSession", session["SESSIONTYPE"])
self.assertEqual("P4RestSession", getattr(session, "__class__").__name__)
self.assertIsInstance(session, P4RestSession)
session["A"] = "A"
session["#A"] = "A"
session["a"] = "a"
session["#a"] = "a"
session.validate()
session.dump(filename)
testdict = {}
testdict.update(session)
session = P4RestSession.createFromFile(filename)
# dict contains 2 lower-case entries
self.assertEqual(len(testdict)-2, len(session))
self.assertEqual(2, len(set(testdict.keys()) - set(session.keys())))
for k in session:
self.assertEqual(session[k], testdict[k])
osremovefile(filename)
session = P4RestSession.createFromDict(testdict)
self.assertEqual(len(testdict), len(session))
self.assertEqual(0, len(set(testdict.keys()) - set(session.keys())))
for k in session:
self.assertEqual(session[k], testdict[k])
osremovefile(filename)
testdict.clear()
testdict["sessionfile"] = filename
testdict["SESSIONTYPE"] = "P4RestSessionP4"
testdict["P4PORT"] = "something:something:something"
testdict["P4USER"] = "someuser"
testdict["P4CLIENT"] = "someclient"
testdict["P4TICKETS"] = os.path.join(self.testdir, ".p4tickets")
testdict["P4TRUST"] = os.path.join(self.testdir, ".p4trust")
session = P4RestSession.createFromDict(testdict)
self.assertIsInstance(session, P4RestSessionP4)
self.assertEqual(len(testdict), len(session))
self.assertEqual(0, len(set(testdict.keys()) - set(session.keys())))
for k in session:
self.assertEqual(session[k], testdict[k])
self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"])
session = P4RestSession.createFromFile(filename)
self.assertIsInstance(session, P4RestSessionP4)
self.assertEqual(len(testdict), len(session))
self.assertEqual(0, len(set(testdict.keys()) - set(session.keys())))
for k in session:
self.assertEqual(session[k], testdict[k])
self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"])
osremovefile(filename)
testdict["SESSIONTYPE"] = "P4RestSessionAdmin"
session = P4RestSession.createFromDict(testdict)
self.assertIsInstance(session, P4RestSessionAdmin)
self.assertEqual(len(testdict), len(session))
self.assertEqual(0, len(set(testdict.keys()) - set(session.keys())))
for k in session:
self.assertEqual(session[k], testdict[k])
self.assertEqual("P4RestSessionAdmin", session["SESSIONTYPE"])
session = P4RestSession.createFromFile(filename)
self.assertIsInstance(session, P4RestSessionAdmin)
self.assertEqual(len(testdict), len(session))
self.assertEqual(0, len(set(testdict.keys()) - set(session.keys())))
for k in session:
self.assertEqual(session[k], testdict[k])
self.assertEqual("P4RestSessionAdmin", session["SESSIONTYPE"])
osremovefile(filename)
class TestP4RestSessionP4(P4Testcase):
'''Test P4 environment within the P4RestSessionP4'''
def setUp(self):
self.orgP4CONFIG = osenviron.get("P4CONFIG", None)
self.orgP4TICKETS = osenviron.get("P4TICKETS", None)
self.orgP4TRUST = osenviron.get("P4TRUST", None)
self.testdir = os.path.abspath(".")
if not os.path.isdir(self.testdir):
raise Exception("Test directory ({}) does not exist."
"".format(self.testdir))
self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir)
self.configKey = "TestP4RestAppConfig_key"
configdefaults = {self.configKey: os.path.join(self.testdir,
"test.json"),
"WORKDIR": os.path.join(self.testdir, "workdir")}
self.appconfig = P4RestAppConfig(os.path.abspath("."), configdefaults)
configfile = self.appconfig.get_config_file(self.configKey)
# add/update program defaults
self.appconfig.set_program_defaults(os.path.dirname(configfile))
self.appconfig.from_json(configfile, silent=True)
# validate application configuration
self.appconfig.validate(self.configKey)
# dump config file
self.appconfig.dump_to_json(configfile, silent=True)
testConfiguration["TESTDIR"] = self.testdir
P4Testcase.setUp(self)
def tearDown(self):
P4Testcase.tearDown(self)
if os.path.isdir(self.testdir): rmtree(self.testdir)
else: logger.warn('not a directory?:' + self.testdir)
if self.orgP4CONFIG is not None:
osenviron["P4CONFIG"] = self.orgP4CONFIG
elif "P4CONFIG" in osenviron:
del(osenviron["P4CONFIG"])
if self.orgP4TICKETS is not None:
osenviron["P4TICKETS"] = self.orgP4TICKETS
elif "P4TICKETS" in osenviron:
del(osenviron["P4TICKETS"])
if self.orgP4TRUST is not None:
osenviron["P4TRUST"] = self.orgP4TRUST
elif "P4CONFIG" in osenviron:
del(osenviron["P4CONFIG"])
def initTestData(self, p4api):
testname = self.id().rsplit(".", 1)[1]
if testname == "testP4Sessions":
p4api = self.p4server.p4api
p4user = p4api.user
for x in range(0, 5):
newuser = "{}{!s}".format(testname, x)
userdata = p4api.fetch_user(newuser)
p4api.save_user(userdata, "-f")
p4api.user = newuser
p4api.run_password("Perf0rce", "Perf0rce")
p4api.user = p4user
p4api.run_configure("set", "security=3")
def countTickets(self):
linecount = 0
with open(self.appconfig["P4TICKETS"]) as f:
for line in f:
if len(line.strip()) > 0:
linecount = linecount + 1
return linecount
def countSessionDirs(self):
dircount = 0
for directory in oslistdir(self.appconfig["WORKDIR"]):
if os.path.isdir(os.path.join(self.appconfig["WORKDIR"], directory)):
dircount = dircount + 1
return dircount
def testP4Sessions(self):
testname = self.id().rsplit(".", 1)[1]
with open(self.appconfig["P4TICKETS"]) as f:
linecount = 0
for line in f:
if len(line.strip()) > 0:
linecount = linecount + 1
self.assertEqual(0, self.countTickets())
self.assertEqual(0, self.countSessionDirs())
sessions = []
users = []
for cmdresult in self.p4server.p4api.run_users():
if cmdresult["User"].startswith(testname):
sessionData = {"P4USER": cmdresult["User"],
"P4PORT": self.p4server.p4api.port,
"P4CLIENT": self.appconfig["P4CLIENT"],
"P4TICKETS": self.appconfig["P4TICKETS"],
"P4TRUST": self.appconfig["P4TRUST"],
"SESSIONTYPE": "P4RestSessionP4"}
sessionKey = P4RestSession.hashkey(sessionData)
sessionData["sessionfile"] = os.path.join(self.appconfig["WORKDIR"],
sessionKey,
self.appconfig["P4CONFIG"])
session = P4RestSession.createFromDict(sessionData)
self.assertIsNotNone(session)
session.run_login("Perf0rce")
sessions.append(session)
users.append(cmdresult["User"])
p4api = session.getP4API()
cmdresult = p4api.fetch_user()
self.assertEqual(users[-1], cmdresult["User"])
cmdresult = p4api.run_info()
self.assertEqual(users[-1], cmdresult[0]["userName"])
self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"])
self.assertFalse(session.isExpired())
session.dispose()
self.assertEqual(5, len(sessions))
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
for i in range(0, len(sessions)):
session = sessions[i]
user = users[i]
p4api = session.getP4API()
cmdresult = p4api.fetch_user()
self.assertEqual(user, cmdresult["User"])
cmdresult = p4api.run_info()
self.assertEqual(user, cmdresult[0]["userName"])
self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"])
self.assertFalse(session.isExpired())
session.expire()
p4api = session.getP4API()
with self.assertRaises(P4Exception) as cm:
print(p4api.fetch_user())
self.assertEqual(1, len(cm.exception.errors))
expected = "Perforce password (P4PASSWD) invalid or unset."
actual = cm.exception.errors[0]
self.assertEqual(expected, actual)
self.assertEqual(5, len(sessions))
self.assertEqual(0, self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
for i in range(0, len(sessions)):
session = sessions[i]
user = users[i]
self.assertTrue(session.isExpired())
session.run_login("Perf0rce")
expiresin = (session.get("EXPIRES", datetime.utcnow()) - datetime.utcnow()).total_seconds()
self.assertGreater(expiresin, 0)
self.assertLess(expiresin, 300)
p4api = session.getP4API()
cmdresult = p4api.fetch_user()
self.assertEqual(user, cmdresult["User"])
cmdresult = p4api.run_info()
self.assertEqual(user, cmdresult[0]["userName"])
self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"])
session.dispose()
self.assertEqual(5, len(sessions))
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
for i in range(0, len(sessions)):
session = sessions[i]
user = users[i]
p4api = session.getP4API()
cmdresult = p4api.fetch_user()
self.assertEqual(user, cmdresult["User"])
cmdresult = p4api.run_info()
self.assertEqual(user, cmdresult[0]["userName"])
self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"])
session.dispose()
self.assertEqual(5, len(sessions))
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
for i in range(0, len(sessions)):
session = P4RestSession.createFromFile(sessions[i].getfilename())
user = users[i]
p4api = session.getP4API()
cmdresult = p4api.fetch_user()
self.assertEqual(user, cmdresult["User"])
cmdresult = p4api.run_info()
self.assertEqual(user, cmdresult[0]["userName"])
self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"])
session.expire()
session.dispose()
self.assertFalse(os.path.exists(session.getfilename()))
self.assertFalse(os.path.exists(session.getdirname()))
self.assertEqual(5, len(sessions))
self.assertEqual(0, self.countTickets())
self.assertEqual(0, self.countSessionDirs())
class TestWWWP4Sessions(P4Testcase):
'''Test WWW environment within the P4RestSessionP4'''
def setUp(self):
self.orgP4CONFIG = osenviron.get("P4CONFIG", None)
self.orgP4TICKETS = osenviron.get("P4TICKETS", None)
self.orgP4TRUST = osenviron.get("P4TRUST", None)
self.testdir = os.path.abspath(".")
if not os.path.isdir(self.testdir):
raise Exception("Test directory ({}) does not exist."
"".format(self.testdir))
self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir)
self.configKey = "TestP4RestAppConfig_key"
configdefaults = {self.configKey: os.path.join(self.testdir,
"test.json"),
"WORKDIR": os.path.join(self.testdir, "workdir"),
APP_CONFIG_KEY: self.configKey,
"DEBUG": True}
self.wwwapp = create_app(configdefaults)
self.wwwapp.testing = True
self.workdir = self.wwwapp.config["WORKDIR"]
self.p4tickets = self.wwwapp.config["P4TICKETS"]
testConfiguration["TESTDIR"] = self.testdir
P4Testcase.setUp(self)
def tearDown(self):
P4Testcase.tearDown(self)
if os.path.isdir(self.testdir): rmtree(self.testdir)
else: logger.warn('not a directory?:' + self.testdir)
if self.orgP4CONFIG is not None:
osenviron["P4CONFIG"] = self.orgP4CONFIG
elif "P4CONFIG" in osenviron:
del(osenviron["P4CONFIG"])
if self.orgP4TICKETS is not None:
osenviron["P4TICKETS"] = self.orgP4TICKETS
elif "P4TICKETS" in osenviron:
del(osenviron["P4TICKETS"])
if self.orgP4TRUST is not None:
osenviron["P4TRUST"] = self.orgP4TRUST
elif "P4CONFIG" in osenviron:
del(osenviron["P4CONFIG"])
def genAuthHeader(self, user, password):
data = b""
if user is not None and len(user) > 0:
data = user.encode()
data = data + b":"
if password is not None and len(password) > 0:
data = data + password.encode()
return b"Basic " + b64encode(data)
def initTestData(self, p4api):
testname = self.id().rsplit(".", 1)[1]
if testname == "testP4SessionsAuthFail":
p4api = self.p4server.p4api
p4user = p4api.user
newuser = "{}{!s}".format(testname, 0)
userdata = p4api.fetch_user(newuser)
p4api.save_user(userdata, "-f")
p4api.user = newuser
p4api.run_password("Perf0rce", "Perf0rce")
p4api.user = p4user
p4api.run_configure("set", "security=3")
elif testname == "testP4Sessions":
p4api = self.p4server.p4api
p4user = p4api.user
for x in range(0, 5):
newuser = "{}{!s}".format(testname, x)
userdata = p4api.fetch_user(newuser)
p4api.save_user(userdata, "-f")
p4api.user = newuser
p4api.run_password("Perf0rce", "Perf0rce")
p4api.user = p4user
p4api.run_configure("set", "security=3")
def testP4SessionsAuthFail(self):
testname = self.id().rsplit(".", 1)[1]
users = []
for cmdresult in self.p4server.p4api.run_users():
if cmdresult["User"].startswith(testname):
users.append(cmdresult["User"])
self.assertEqual(1, len(users))
with self.wwwapp.app_context():
with current_app.test_client() as client:
# success
user = users[0]
headers = Headers()
headers.add('Authorization', self.genAuthHeader(user, "Perf0rce"))
headers.add("Content-Type", "application/json")
data = {"P4PORT": self.p4server.p4api.port}
json_data = json.dumps(data)
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(200, response.status_code)
self.assertEqual("application/json", response.headers.get("Content-Type"))
response_data = json.loads(response.get_data())
self.assertIsInstance(response_data, dict)
self.assertEqual(user, response_data["userName"])
sessionId = response_data.get(SESSION_HEADER_NAME, None)
self.assertIsNotNone(sessionId)
self.assertEqual(sessionId, response_data.get("SESSIONID", None))
# test get
headers.clear()
headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId))
json_data = None
response = client.get(path='/sessions/{}'.format(sessionId),
data=json_data,
headers=headers)
self.assertEqual(200, response.status_code)
self.assertEqual("application/json", response.headers.get("Content-Type"))
response_data = json.loads(response.get_data())
self.assertIsInstance(response_data, dict)
self.assertEqual(user, response_data["userName"])
sessionId = response_data.get(SESSION_HEADER_NAME, None)
self.assertIsNotNone(sessionId)
self.assertEqual(sessionId, response_data.get("SESSIONID", None))
# test delete
headers.clear()
headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId))
json_data = None
response = client.delete(path='/sessions/{}'.format(sessionId),
data=json_data,
headers=headers)
self.assertEqual(204, response.status_code)
with self.wwwapp.app_context():
with current_app.test_client() as client:
# test no P4PORT
headers.clear()
headers.add('Authorization', self.genAuthHeader(user, "Perf0rce"))
json_data = None
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(401, response.status_code)
# test no P4PORT
headers = Headers()
headers.add('Authorization', self.genAuthHeader(user, "Perf0rce"))
headers.add("Content-Type", "application/json")
json_data = None
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(400, response.status_code)
# test no P4PORT
headers.clear()
headers.add('Authorization', self.genAuthHeader(user, "Perf0rce"))
headers.add("Content-Type", "application/json")
data = {"BADP4PORT": self.p4server.p4api.port}
json_data = json.dumps(data)
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(401, response.status_code)
# test no password
headers.clear()
headers.add('Authorization', self.genAuthHeader(user, None))
headers.add("Content-Type", "application/json")
data = {"P4PORT": self.p4server.p4api.port}
json_data = json.dumps(data)
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(401, response.status_code)
# test no USER
headers.clear()
headers.add('Authorization', self.genAuthHeader(None, "Perf0rce"))
headers.add("Content-Type", "application/json")
data = {"P4PORT": self.p4server.p4api.port}
json_data = json.dumps(data)
response = client.post(path="/sessions/",
data=json_data,
headers=headers)
self.assertEqual(401, response.status_code)
def postSession(self, client, P4PORT, P4USER, P4PASSWORD):
headers = Headers()
headers.add('Authorization', self.genAuthHeader(P4USER, P4PASSWORD))
headers.add("Content-Type", "application/json")
data = {"P4PORT": P4PORT}
json_data = json.dumps(data)
return client.post(path="/sessions/",
data=json_data,
headers=headers)
def getSession(self, client, sessionId):
headers = Headers()
headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId))
return client.get(path="/sessions/{}".format(sessionId),
headers=headers)
def deleteSession(self, client, sessionId):
headers = Headers()
headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId))
return client.delete(path="/sessions/{}".format(sessionId),
headers=headers)
def countTickets(self):
linecount = 0
with open(self.p4tickets) as f:
for line in f:
if len(line.strip()) > 0:
linecount = linecount + 1
return linecount
def countSessionDirs(self):
dircount = 0
for directory in oslistdir(self.workdir):
if os.path.isdir(os.path.join(self.workdir, directory)):
dircount = dircount + 1
return dircount
def testP4Sessions(self):
testname = self.id().rsplit(".", 1)[1]
users = []
sessions = {}
self.assertEqual(0, len(users))
self.assertEqual(0, len(sessions))
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
for cmdresult in self.p4server.p4api.run_users():
if cmdresult["User"].startswith(testname):
users.append(cmdresult["User"])
with self.wwwapp.app_context():
with current_app.test_client() as client:
response = self.postSession(client, self.p4server.p4api.port, users[-1], "Perf0rce")
self.assertEqual(200, response.status_code)
self.assertEqual("application/json", response.headers.get("Content-Type"))
response_data = json.loads(response.get_data())
self.assertIsInstance(response_data, dict)
self.assertEqual(users[-1], response_data["userName"])
sessionId = response_data.get(SESSION_HEADER_NAME, None)
self.assertIsNotNone(sessionId)
self.assertEqual(sessionId, response_data.get("SESSIONID", None))
sessions[users[-1]] = sessionId
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
self.assertEqual(5, len(users))
self.assertEqual(len(users), len(sessions))
for user in users:
sessionId = sessions.pop(user)
with self.wwwapp.app_context():
with current_app.test_client() as client:
response = self.getSession(client, sessionId)
self.assertEqual(200, response.status_code)
self.assertEqual("application/json", response.headers.get("Content-Type"))
response_data = json.loads(response.get_data())
self.assertIsInstance(response_data, dict)
self.assertEqual(user, response_data["userName"])
self.assertEqual(sessionId, response_data.get("SESSIONID", None))
with self.wwwapp.app_context():
with current_app.test_client() as client:
response = self.deleteSession(client, sessionId)
self.assertEqual(204, response.status_code)
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
with self.wwwapp.app_context():
with current_app.test_client() as client:
response = self.getSession(client, sessionId)
self.assertEqual(401, response.status_code)
response = self.deleteSession(client, sessionId)
self.assertEqual(401, response.status_code)
self.assertEqual(5, len(users))
self.assertEqual(0, len(sessions))
self.assertEqual(len(sessions), self.countTickets())
self.assertEqual(len(sessions), self.countSessionDirs())
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()