#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ==============================================================================
# Copyright and license info is available in the LICENSE file included with
# the Server Deployment Package (SDP), and also available online:
# https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE
# ------------------------------------------------------------------------------
# tag::includeManual[]
"""
NAME:
WorkspaceCleanup.py
DESCRIPTION:
This script will generate a .csv file containing workspace report with useful cleanup information.
Included in the report are:
workspace name
last access date
owner email
command to offload workspace
command to reload workspace
command to remove workspace
"""
# end::includeManual[]
# Python 2.7/3.3 compatibility.
from __future__ import print_function
import sys
import argparse
import textwrap
import traceback
import P4
import os
import logging
import traceback
from datetime import datetime
from os.path import basename, splitext
script_name = basename(splitext(__file__)[0])
# If working on a server with the SDP, the 'LOGS' environment variable contains
# the path the standard logging directory. The '-L <logfile>' argument should
# be specified in non-SDP environments.
LOGDIR = os.getenv('LOGS', 'C:/temp')
DEFAULT_LOG_FILE = "WorkspaceCleanup.log"
if os.path.exists(LOGDIR):
DEFAULT_LOG_FILE = "%s/WorkspaceCleanup.log" % LOGDIR
DEFAULT_VERBOSITY = 'DEBUG'
LOGGER_NAME = 'WorkspaceCleanup'
OUTPUTDIR = os.getenv('OUTPUT', 'C:/temp')
DEFAULT_OUTPUT_FILE = "WorkspaceCleanup.csv"
if os.path.exists(OUTPUTDIR):
DEFAULT_OUTPUT_FILE = "%s/WorkspaceCleanup.csv" % OUTPUTDIR
DEFAULT_VERBOSITY = 'DEBUG'
OUTPUT_NAME = 'WorkspaceCleanup'
class WorkspaceCleanup():
"""See module doc string for details"""
def __init__(self, *args, **kwargs):
self.p4 = P4.P4(**kwargs)
self.options = None
self.parse_args(__doc__, args)
def parse_args(self, doc, args):
"""Common parsing and setting up of args"""
desc = textwrap.dedent(doc)
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=desc,
epilog="Copyright (c) 2022 Perforce Software, Inc."
)
self.add_parse_args(parser) # Should be implemented by subclass
self.options = parser.parse_args(args=args)
self.init_logger()
self.logger.debug("Command Line Options: %s\n" % self.options)
def add_parse_args(self, parser, default_log_file=None, default_verbosity=None, default_output_file=None):
"""Default arguments:
:param default_verbosity:
:param default_log_file:
:param default_output_file:
:param parser:
"""
if not default_log_file:
default_log_file = DEFAULT_LOG_FILE
if not default_output_file:
default_output_file = DEFAULT_OUTPUT_FILE
if not default_verbosity:
default_verbosity = DEFAULT_VERBOSITY
parser.add_argument('-O', '--output', default=default_output_file, help="Output file path.")
parser.add_argument('-p', '--port', default=None,
help="Perforce server port - set using %%serverport%%. Default: $P4PORT")
parser.add_argument('-u', '--p4user', default=None, help="Perforce user. Default: $P4USER")
parser.add_argument('-L', '--log', default=default_log_file, help="Default: " + default_log_file)
parser.add_argument('-T', '--tickets', help="P4TICKETS file full path")
parser.add_argument('-v', '--verbosity',
nargs='?',
const="INFO",
default=default_verbosity,
choices=('DEBUG', 'WARNING', 'INFO', 'ERROR', 'FATAL'),
help="Output verbosity level. Default is: " + default_verbosity)
# Specific args for this utility (optional)
def init_logger(self, logger_name=None):
if not logger_name:
logger_name = LOGGER_NAME
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(self.options.verbosity)
logformat = '%(levelname)s %(asctime)s %(filename)s %(lineno)d: %(message)s'
logging.basicConfig(format=logformat, filename=self.options.log, level=self.options.verbosity)
def setupP4(self):
if self.options.port:
self.p4.port = self.options.port
if self.options.p4user:
self.p4.user = self.options.p4user
if self.options.tickets:
self.p4.ticket_file = self.options.tickets
self.p4.logger = self.logger
self.logger.debug("P4 port: '%s', user: '%s'" % (self.p4.port, self.p4.user))
def message(self, msg):
"""Method to send a message to the user. Just writes to stdout, but it's
nice to encapsulate that here.
:param msg: """
print(msg)
def reportException(self):
"""Method to encapsulate error reporting to make sure
all errors are reported in a consistent way"""
exc_type, exc_value, exc_tb = sys.exc_info()
self.logger.error("Exception during script execution: %s %s %s" % (exc_type, exc_value, exc_tb))
self.reportP4Errors()
self.logger.error("called from:\n%s", "".join(traceback.format_exception(exc_type, exc_value, exc_tb)))
self.logger.error("port %s user %s tickets %s" % (self.p4.port, self.p4.user, self.p4.ticket_file))
return 1
def reportP4Errors(self):
lines = []
for e in self.p4.errors:
lines.append("P4 ERROR: %s" % e)
for w in self.p4.warnings:
lines.append("P4 WARNING: %s" % w)
if lines:
self.message("\n".join(lines))
def generateReport(self):
self.setupP4()
self.p4.connect()
users = self.p4.run("users")
workspaces = self.p4.run("workspaces")
lines = ["workspace name,last access date,owner,owner email,command to unload workspace,command to reload workspace,command to remove workspace"]
for workspace in workspaces:
last_access_epoch = int(workspace['Access'])
last_access_date = datetime.fromtimestamp(last_access_epoch)
email = "no_address_found@noone.org"
for user in users:
if user['User'] == workspace['Owner']:
email = user['Email']
userid = workspace['Owner']
lines.append(workspace['client']+","+str(last_access_date)+","+workspace['Owner']+","+email+","+"p4 unload -z -c "+workspace['client']+","+"p4 reload -c "+workspace['client']+","+"p4 client -d "+workspace['client'])
return "\n".join(lines)
def run(self):
"""Runs script"""
now = datetime.now()
try:
arguments = vars(self.options)
outputfilename = arguments["output"]
outputfilename = outputfilename.replace(".csv", "")
outputfilename = outputfilename+"."+str(now.year)+"."+str(now.month)+"."+str(now.day)+"."+str(now.hour)+"."+str(now.minute)+".csv"
outputfilehandle = open (outputfilename, "w")
outputfilehandle.truncate(0)
contents = self.generateReport()
print (contents, file=outputfilehandle)
outputfilehandle.close()
except Exception:
return self.reportException()
return 0
if __name__ == '__main__':
""" Main Program"""
obj = WorkspaceCleanup(*sys.argv[1:])
sys.exit(obj.run())