#!/usr/bin/env python
# This script requires the pytop module.
# You can get it with "pip install pyotp"
# Source: https://github.com/pyotp/pyotp
#
# This trigger will check to see if the userid is in a the LOCL_PASSWD_FILE first and
# authenticate with that if found. If the users isn't in the local file, it checks to
# see if the user is a service user, and if so, it will authenticate against just the auth server.
# Finally, it will check the user's password and authenticator token if the other two conditions
# don't match.
#
# The trigger table entry is:
# authtrigger auth-check auth "/p4/common/bin/triggers/vip_authcheckTrigger.py %user% %serverport%"
import os
import re
import ldap
import sys
import subprocess
import pyotp
#### Configuration values
P4="/p4/common/bin/p4"
# authtype can be p4 or ad
authtype="p4"
# Services users are users that by-pass two factor authentication.
SVC_USER_FILE="/p4/common/bin/triggers/serviceusers.txt"
# Local users by-pass two factor and external auth
# Format is username:password, one per line.
LOCAL_PASSWD_FILE="/p4/common/bin/triggers/local.passwd"
# These variables are for authtype ad
TIMEOUT=20
AD_HOST="ldap://domain.company.com"
DOMAIN="MYDOMAIN\\"
# These variables are for authtype p4
# You must set up a p4d server for user passwords and keys, if you
# are using ad for passwords, your auth server should be your production server.
# This user must have at least admin access and an unlimited login timeout.
p4admin = os.environ['P4USER']
# This is the Perforce server where user's password and/or keys are stored.
authserver = "localhost:1667"
# These are user accounts you wish to temporarily block access
block_users = []
#### End Configuration
# localUserExists
# checks to see if the user exists in the local password file. returns True or False
def localUserExists(username):
exists = False
if LOCAL_PASSWD_FILE is not None and os.path.isfile(LOCAL_PASSWD_FILE):
f = open(LOCAL_PASSWD_FILE)
for line in f:
if line.startswith("%s:"%username):
exists = True
break
f.close()
return exists
# getLocalPassword
# retrieves the local password entry (if there is one) for the specified user
def getLocalPassword(username):
password = None
if LOCAL_PASSWD_FILE is not None and os.path.isfile(LOCAL_PASSWD_FILE):
f = open(LOCAL_PASSWD_FILE)
for line in f:
line = line.strip()
if line.startswith("%s:"%username):
parts = line.split(":",2)
password = parts[1]
break
f.close()
return password
# checkLDAPPassword
# checks the user and password against the LDAP server
def checkLDAPPassword(userid, password):
# the following is needed to allow Python to accept a non-CA cert
#ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
con = ldap.initialize(AD_HOST)
con.set_option(ldap.OPT_NETWORK_TIMEOUT, TIMEOUT)
con.set_option(ldap.OPT_TIMEOUT, TIMEOUT)
try:
# try to bind
aduser = con.simple_bind_s(DOMAIN + userid, password)
return 0
except Exception as e:
# if bind throws an exception, then access is DENIED
return 1
# Check P4 Password
def checkP4Password(authserver, p4admin, userid, password):
password = password[:-6]
password = password.replace(r'$',r'\$')
try:
if re.search(r'"', password):
if re.search(r"'", password):
print("You cannot have both a \" and a ' in your password.")
sys.exit(1)
password = password.replace(r'"','\"')
return_code = subprocess.call("echo '%s'|p4 -p %s -u %s login" % (password, authserver, userid), shell=True)
else:
password = password.replace(r"'","\'")
return_code = subprocess.call('echo "%s"|p4 -p %s -u %s login' % (password, authserver, userid), shell=True)
return return_code
except Exception as e:
print("Problem with call to auth server. Please contact IT")
return 1
# Check token
def otpCheck(authserver, p4admin, userid, password):
# Get the user's secret key from Perforce
try:
otpkey = (subprocess.check_output("p4 -p %s -u %s key %s" % (authserver, p4admin, userid), shell=True)).strip()
except:
print("Error connecting to p4 key server %s." % authserver)
sys.exit(1)
# Create a time based otp object with the user's key
totp = pyotp.TOTP(otpkey)
current_key = totp.now()
userkey = password[-6:]
if(int(current_key) == int(userkey)):
return 0
else:
return 1
def getsvcusers(serviceusers):
svcuserfile = open(SVC_USER_FILE, "r")
for line in svcuserfile.readlines():
line = line.lower()
serviceusers.append(line.strip())
svcuserfile.close()
# doAuthenticate
# main routine for performing the authentication logic
def doAuthenticate(userid):
svcusers = []
# read password from STDIN -- this is passed by a perforce client when the user does a 'p4 login' and is prompted for a password
# default behavior is to deny access (return code 1)
result=1
svc_user=0
password = sys.stdin.read().strip()
if(password == ""):
print("Blank password not allowed.")
sys.exit(1)
# check to see if the user account exists in the local password file
# if user exists in local password file, we will not consult LDAP
if(localUserExists(userid)):
# get the local password, if there is one
localPassword = getLocalPassword(userid)
# user exists in local password file
if(localPassword is None or len(localPassword.strip()) == 0):
print("Local password is missing.")
else:
# retrieved from the local password file
if(password == localPassword):
result = 0
else:
if(authtype == "p4"):
passresult = checkP4Password(authserver, p4admin, userid, password)
else:
passresult = checkLDAPPassword(userid, password)
getsvcusers(svcusers)
if(userid.lower() in svcusers):
svc_user=1
result = passresult
else:
otpresult = otpCheck(authserver, p4admin, userid, password)
if(otpresult or passresult):
result = 1
else:
result = 0
# now check the result, returning "authentication failed" error if result is not 0
if (result):
if (svc_user):
print("Invalid login, please use only your password to login.")
else:
print("Invalid login, you must enter your password and Google Authenticator token to login.")
# exit with the result code
sys.exit(result)
if __name__ == '__main__':
if len(sys.argv) < 2:
print( "Usage: %s username serverport" % sys.argv[0])
sys.exit(1)
userid = sys.argv[1]
if userid.lower() in block_users:
print("Your account is currently blocked due to an automated process trying to log in.")
sys.exit(1)
doAuthenticate(userid)
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #1 | 31397 | C. Thomas Tyler | Populate -b SDP_Classic_to_Streams -s //guest/perforce_software/sdp/...@31368. | ||
| //guest/perforce_software/sdp/dev/Unsupported/Samples/triggers/otpauthcheck.py | |||||
| #1 | 26652 | Robert Cowham |
This is Tom's change: Introduced new 'Unsupported' directory to clarify that some files in the SDP are not officially supported. These files are samples for illustration, to provide examples, or are deprecated but not yet ready for removal from the package. The Maintenance and many SDP triggers have been moved under here, along with other SDP scripts and triggers. Added comments to p4_vars indicating that it should not be edited directly. Added reference to an optional site_global_vars file that, if it exists, will be sourced to provide global user settings without needing to edit p4_vars. As an exception to the refactoring, the totalusers.py Maintenance script will be moved to indicate that it is supported. Removed settings to support long-sunset P4Web from supported structure. Structure under new .../Unsupported folder is: Samples/bin Sample scripts. Samples/triggers Sample trigger scripts. Samples/triggers/tests Sample trigger script tests. Samples/broker Sample broker filter scripts. Deprecated/triggers Deprecated triggers. To Do in a subsequent change: Make corresponding doc changes. |
||
| //guest/perforce_software/sdp/dev/Server/Unix/p4/common/bin/triggers/otpauthcheck.py | |||||
| #1 | 17250 | Russell C. Jackson (Rusty) |
Auth trigger and script to generate otp QR code for using Google Auth. Looks good to me. Not sure how to test in our existing test suite? |
||