#!/usr/bin/env python3 """ p4verify.py - Perforce verification script (refactored) Performs 'p4 verify' of submitted and shelved versioned files in depots of all types except 'remote', 'archive', and 'unload' (for depot pass), plus shelved changelists. """ from __future__ import annotations import argparse import fcntl import os import shlex import subprocess import sys import tempfile import time from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Iterable, List, Optional VERSION = "7.0.0" # ===================================================================== # Utilities # ===================================================================== def run_shell( cmd: str, *, cwd: Optional[str] = None, capture: bool = False, check: bool = False, env: Optional[dict] = None, ) -> subprocess.CompletedProcess: """Small helper for shell=True commands.""" return subprocess.run( cmd, shell=True, cwd=cwd, capture_output=capture, text=True, check=check, env=env, ) def timestamp() -> str: return datetime.now().strftime("%a %Y-%m-%d %H:%M:%S %Z") # ===================================================================== # Config / state # ===================================================================== @dataclass class VerifyConfig: instance: str threads: int = 1 debug: bool = False shelves_only: bool = False min_changelist: Optional[int] = None @property def revision_spec(self) -> str: return f"@>{self.min_changelist}" if self.min_changelist else "" @dataclass class SDPEnv: p4bin: str p4port: str p4user: str p4cbin: Path logs_dir: Path server_type: str = "" p4replica: str = "FALSE" shareddata: str = "FALSE" p4server: str = "unknown" hostname: str = "unknown" p4tmp: Path = Path("/tmp") @classmethod def from_environment(cls) -> "SDPEnv": p4bin = os.environ.get("P4BIN", "p4") p4port = os.environ.get("P4PORT") p4user = os.environ.get("P4USER") p4cbin = os.environ.get("P4CBIN") logs = os.environ.get("LOGS") if not p4port or not p4user: raise RuntimeError("P4PORT or P4USER environment variables not set") if not p4cbin: raise RuntimeError("P4CBIN environment variable not set") if not logs: raise RuntimeError("LOGS environment variable not set") return cls( p4bin=p4bin, p4port=p4port, p4user=p4user, p4cbin=Path(p4cbin), logs_dir=Path(logs), server_type=os.environ.get("SERVER_TYPE", ""), p4replica=os.environ.get("P4REPLICA", "FALSE"), shareddata=os.environ.get("SHAREDDATA", "FALSE"), p4server=os.environ.get("P4SERVER", "unknown"), hostname=os.environ.get("HOSTNAME", "unknown"), p4tmp=Path(os.environ.get("P4TMP", "/tmp")), ) # ===================================================================== # Logging # ===================================================================== class FileLogger: """Simple file logger with flock-based append, plus optional stdout echo.""" def __init__(self, path: Path, echo_to_stdout: bool = True) -> None: self.path = path self.echo_to_stdout = echo_to_stdout def log(self, msg: str) -> None: line = msg.rstrip("\n") + "\n" if self.echo_to_stdout: sys.stdout.write(line) self._append(line) def _append(self, text: str) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) with open(self.path, "a", encoding="utf-8", errors="replace") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(text) fcntl.flock(f, fcntl.LOCK_UN) # ===================================================================== # Perforce wrapper # ===================================================================== class P4Session: """Thin wrapper around 'p4' with convenient helpers.""" def __init__(self, env: SDPEnv) -> None: self.env = env self.base_cmd = f"{shlex.quote(env.p4bin)} -p {shlex.quote(env.p4port)} -u {shlex.quote(env.p4user)}" # --- core runner ------------------------------------------------- def run(self, args: Iterable[str], *, capture: bool = True) -> str: """Run a p4 command and return stdout (stripped).""" if isinstance(args, str): cmd_str = f"{self.base_cmd} {args}" else: cmd_list = [self.base_cmd] for a in args: cmd_list.append(str(a)) cmd_str = " ".join(cmd_list) proc = run_shell(cmd_str, capture=capture) return proc.stdout.strip() # --- higher-level helpers ---------------------------------------- def list_depots(self) -> List[str]: """Return depot names excluding remote/archive/unload.""" out = self.run(["-ztag", "-F", "%name%,%type%", "depots"]) depots: List[str] = [] for line in out.splitlines(): if not line: continue name, d_type = line.split(",", 1) if d_type not in {"remote", "archive", "unload"}: depots.append(name) return depots def unload_depot_name(self) -> Optional[str]: out = self.run(["-ztag", "-F", "%name%,%type%", "depots"]) for line in out.splitlines(): if not line: continue name, d_type = line.split(",", 1) if d_type == "unload": return name return None def list_dirs(self, depot: str) -> List[str]: out = self.run(["-ztag", "-F", "%dir%", "dirs", f"//{depot}/*"]) return [l for l in out.splitlines() if l] def shelved_changes(self) -> List[str]: out = self.run(["-ztag", "-F", "%change%", "changes", "-s", "shelved"]) return [c for c in out.splitlines() if c] def monitor_verify_count(self) -> int: out = self.run(["monitor", "show", "-a"], capture=True) # simple heuristic: count "verify" commands return sum(1 for line in out.splitlines() if " verify" in line or line.strip().startswith("verify")) def files_in_range(self, pathspec: str) -> int: """Return number of files in a filespec by counting lines.""" out = self.run(["-ztag", "-F", "%depotFile%", "files", pathspec]) if not out: return 0 return sum(1 for _ in out.splitlines()) # ===================================================================== # Main verifier # ===================================================================== class P4Verifier: def __init__(self, cfg: VerifyConfig) -> None: self.cfg = cfg self.exit_code = 0 self.status_message = "OK: All scanned depots verified OK." # populated in run() self.sdp_env: Optional[SDPEnv] = None self.p4: Optional[P4Session] = None self.log: Optional[FileLogger] = None self.summary_log: Optional[FileLogger] = None self.main_log_path: Optional[Path] = None self.summary_log_path: Optional[Path] = None self.verify_failed = False # ----------------------------------------------------------------- # High-level orchestration # ----------------------------------------------------------------- def run(self) -> None: os.environ["SDP_INSTANCE"] = self.cfg.instance self._load_sdp_environment() self._load_backup_functions() self.sdp_env = SDPEnv.from_environment() self.p4 = P4Session(self.sdp_env) # set up logs + rotation self._prepare_logs() self.log.log(f"p4verify.py v{VERSION} Starting verify at {timestamp()}.") if self.cfg.min_changelist: self.log.log( f"Running with changelist filter: only verifying files changed after changelist {self.cfg.min_changelist}." ) # p4login self._ensure_p4login() if self.sdp_env.server_type in ("p4d_edge", "p4d_edgerep"): self.log.log( f"Exiting because server type is {self.sdp_env.server_type}, " "and we do not verify those since they should be running in cache mode." ) sys.exit(0) self.log.log("If there are errors in this log, contact your local support team.") verify_cmd = self._build_verify_cmd() # unload depot first (unless shelves-only) if not self.cfg.shelves_only: self._verify_unload_depot() # shelves shelves_log_path = self.sdp_env.logs_dir / "p4verify-shelves.txt" self._verify_shelves(verify_cmd, shelves_log_path) # depot content if not self.cfg.shelves_only: self._verify_depots(verify_cmd) # merge logs and scan for BAD/MISSING self._finalize_logs(shelves_log_path) self._scan_for_errors() self.log.log(f"Completed verifications at {datetime.now()}.") self._maybe_mail_log() sys.exit(self.exit_code) # ----------------------------------------------------------------- # SDP / env helpers # ----------------------------------------------------------------- def _load_sdp_environment(self) -> None: sdp_env = "/p4/common/bin/p4_vars" if not Path(sdp_env).exists(): self._bail(f"SDP environment file not found: {sdp_env}") cmd = f"source {shlex.quote(sdp_env)} {shlex.quote(self.cfg.instance)} && env" proc = run_shell(cmd, capture=True, executable="/bin/bash") # type: ignore[arg-type] if proc.returncode != 0: self._bail(f"Failed to load SDP environment for instance {self.cfg.instance}.") for line in proc.stdout.splitlines(): if "=" not in line: continue key, value = line.split("=", 1) # Skip bash functions and weird keys if value.startswith("() {") or " " in key: continue os.environ[key] = value def _load_backup_functions(self) -> None: p4cbin = os.environ.get("P4CBIN") if not p4cbin: self._bail("P4CBIN environment variable not set") backup_functions = Path(p4cbin) / "backup_functions.sh" if not backup_functions.exists(): self._bail(f"backup_functions.sh not found: {backup_functions}") # ----------------------------------------------------------------- # Logging / rotation # ----------------------------------------------------------------- def _prepare_logs(self) -> None: assert self.sdp_env is not None logs = self.sdp_env.logs_dir # main Perforce verify output main_log = logs / "verify_output.log" summary_log = logs / "p4verify.log" # clean old per-depot logs for old in logs.glob("p4verify-*"): old.unlink() # rotate old summary log if summary_log.exists(): backup = summary_log.with_suffix(f".log.{int(time.time())}.gz") run_shell(f"gzip -c {shlex.quote(str(summary_log))} > {shlex.quote(str(backup))}") summary_log.unlink() # delete existing main log if main_log.exists(): main_log.unlink() self.main_log_path = main_log self.summary_log_path = summary_log self.log = FileLogger(main_log, echo_to_stdout=True) self.summary_log = FileLogger(summary_log, echo_to_stdout=False) # ----------------------------------------------------------------- # p4login, verify command selection # ----------------------------------------------------------------- def _ensure_p4login(self) -> None: assert self.sdp_env is not None p4login_path = self.sdp_env.p4cbin / "p4login" proc = run_shell(str(p4login_path)) if proc.returncode != 0: self._bail(f"p4login failed with exit code {proc.returncode}") def _build_verify_cmd(self) -> str: assert self.sdp_env is not None p4replica = self.sdp_env.p4replica shareddata = self.sdp_env.shareddata # Replica vs shareddata controls whether to schedule transfer if p4replica == "FALSE" or shareddata == "TRUE": return "verify --only MISSING -q" return "verify --only MISSING -qt" # ----------------------------------------------------------------- # Unload depot # ----------------------------------------------------------------- def _verify_unload_depot(self) -> None: assert self.p4 is not None assert self.sdp_env is not None unload_name = self.p4.unload_depot_name() if not unload_name: self.log.log("No unload depot found; skipping unload verification.") return self.log.log("Starting verify of unload depot.") flags = "-qU" if (self.sdp_env.p4replica == "FALSE" or self.sdp_env.shareddata == "TRUE") else "-qUt" spec = f"//{unload_name}/...{self.cfg.revision_spec}" cmd = f"{self.p4.base_cmd} -s verify {flags} --only MISSING {shlex.quote(spec)} >> {shlex.quote(str(self.main_log_path))} 2>&1" # type: ignore[arg-type] run_shell(cmd) # fire and forget # ----------------------------------------------------------------- # Shelves # ----------------------------------------------------------------- def _verify_shelves(self, verify_cmd: str, shelves_log: Path) -> None: assert self.p4 is not None assert self.sdp_env is not None # initialize shelves log with open(shelves_log, "w", encoding="utf-8", errors="replace") as f: f.write("Starting verify of shelves.\n") f.write(f"{datetime.now()}\n") self.log.log("Starting verify of shelves.") changes = self.p4.shelved_changes() shelf_temp_logs: List[Path] = [] for change in changes: temp_log = self.sdp_env.p4tmp / f"shelf_{change}.log" shelf_temp_logs.append(temp_log) # log start with open(shelves_log, "a", encoding="utf-8", errors="replace") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(f"Verifying shelf {change}\n") fcntl.flock(f, fcntl.LOCK_UN) cmd = ( f"{self.p4.base_cmd} {verify_cmd}S @={change} > {shlex.quote(str(temp_log))} 2>&1" ) run_shell(cmd) # no & here; we throttle via monitor below # throttle by monitor count self._wait_for_threads(max_threads=self.cfg.threads, sleep_secs=5) # wait for any remaining verify threads self._wait_for_threads(max_threads=0, sleep_secs=5) # merge shelf temp logs for temp_log in shelf_temp_logs: if not temp_log.exists(): continue with open(temp_log, "r", encoding="utf-8", errors="replace") as src, open( shelves_log, "a", encoding="utf-8", errors="replace" ) as dst: fcntl.flock(dst, fcntl.LOCK_EX) dst.write(src.read()) fcntl.flock(dst, fcntl.LOCK_UN) temp_log.unlink() with open(shelves_log, "a", encoding="utf-8", errors="replace") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write("End verify of shelves.\n") f.write(f"{datetime.now()}\n") fcntl.flock(f, fcntl.LOCK_UN) # ----------------------------------------------------------------- # Depots / depot directories # ----------------------------------------------------------------- def _verify_depots(self, verify_cmd: str) -> None: assert self.p4 is not None assert self.sdp_env is not None assert self.main_log_path is not None depots = self.p4.list_depots() depot_dirs: List[str] = [] for depot in depots: depot_dirs.extend(self.p4.list_dirs(depot)) log_base = self.sdp_env.logs_dir / "p4verify" for depot_dir in depot_dirs: self._verify_single_depot_dir(verify_cmd, depot_dir, log_base) self.log.log("Waiting for all verification threads to complete...") self._wait_for_threads(max_threads=0, sleep_secs=60) # merge per-depot logs for depot in depots: depot_log_path = log_base.with_name(f"{log_base.name}-{depot}.log") if not depot_log_path.exists(): continue with open(depot_log_path, "r", encoding="utf-8", errors="replace") as src, open( self.main_log_path, "a", encoding="utf-8", errors="replace" ) as dst: dst.write(src.read()) depot_log_path.unlink() def _verify_single_depot_dir(self, verify_cmd: str, depot_dir: str, log_base: Path) -> None: assert self.p4 is not None assert self.sdp_env is not None parts = depot_dir.strip("/").split("/") depot_name = parts[0] if parts else "unknown" cmd_log = log_base.with_name(f"{log_base.name}-{depot_name}.log") # header in per-depot log with open(cmd_log, "a", encoding="utf-8", errors="replace") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(f"Verifying {depot_dir}\n") fcntl.flock(f, fcntl.LOCK_UN) rev_spec = self.cfg.revision_spec spec_files = f"{depot_dir}/*{rev_spec}" spec_list = f"{depot_dir}/...{rev_spec}" # initial verify of path/* (for speed) run_shell( f"{self.p4.base_cmd} {verify_cmd} {shlex.quote(spec_files)} >> {shlex.quote(str(cmd_log))} 2>&1", cwd=str(self.sdp_env.p4tmp), ) # build temporary file list for chunked verifies with tempfile.NamedTemporaryFile( mode="w", prefix=f"{depot_name}.", delete=False, dir=self.sdp_env.p4tmp, ) as tmp: tfile_path = Path(tmp.name) run_shell( f"{self.p4.base_cmd} -ztag -F %depotFile% files {shlex.quote(spec_list)} > {shlex.quote(str(tfile_path))}", cwd=str(self.sdp_env.p4tmp), ) # line count and split size try: with open(tfile_path, "r", encoding="utf-8", errors="replace") as f: line_count = sum(1 for _ in f) except OSError: line_count = 0 if 0 <= line_count <= 1_000_000: split_lines = 500 elif 1_000_000 < line_count <= 10_000_000: split_lines = 1000 else: split_lines = 3000 run_shell( f"split -l{split_lines} {shlex.quote(str(tfile_path))} {shlex.quote(str(tfile_path))}_", cwd=str(self.sdp_env.p4tmp), ) tfile_path.unlink(missing_ok=True) # py3.8+: emulate manually if needed split_files = sorted(self.sdp_env.p4tmp.glob(f"{tfile_path.name}_*")) for split_file in split_files: self._wait_for_threads(max_threads=self.cfg.threads, sleep_secs=5) split_log = split_file.with_suffix(".verify.log") with open(cmd_log, "a", encoding="utf-8", errors="replace") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(f"Verifying files in {split_file} from {depot_dir}\n") fcntl.flock(f, fcntl.LOCK_UN) # run in background, cat into depot log, then clean up cmd = ( "(" f"{self.p4.base_cmd} -s -x {shlex.quote(str(split_file))} {verify_cmd} > {shlex.quote(str(split_log))} 2>&1; " f"echo \"=== Completed on $(date)\" >> {shlex.quote(str(split_log))}; " f"cat {shlex.quote(str(split_log))} >> {shlex.quote(str(cmd_log))}; " f"rm {shlex.quote(str(split_file))} {shlex.quote(str(split_log))}" ") &" ) run_shell(cmd, cwd=str(self.sdp_env.p4tmp)) # ----------------------------------------------------------------- # Thread throttling based on p4 monitor # ----------------------------------------------------------------- def _wait_for_threads(self, *, max_threads: int, sleep_secs: int) -> None: """Block until number of 'verify' monitor entries <= max_threads.""" assert self.p4 is not None while True: count = self.p4.monitor_verify_count() if count <= max_threads: return time.sleep(sleep_secs) # ----------------------------------------------------------------- # Finalization / scanning / mail # ----------------------------------------------------------------- def _finalize_logs(self, shelves_log: Path) -> None: assert self.main_log_path is not None if shelves_log.exists(): with open(shelves_log, "r", encoding="utf-8", errors="replace") as src, open( self.main_log_path, "a", encoding="utf-8", errors="replace" ) as dst: dst.write(src.read()) shelves_log.unlink() def _scan_for_errors(self) -> None: assert self.main_log_path is not None assert self.summary_log is not None if self.verify_failed: self.status_message = f"Error: Verify attempt failed. Review the log [{self.summary_log_path}]." self.exit_code = 1 if self.exit_code != 0: return # scan for BAD/MISSING/max errors proc = run_shell( f"egrep '(BAD!|MISSING!|p4 help max)' {shlex.quote(str(self.main_log_path))}", capture=True, ) if proc.returncode == 0 and proc.stdout: self.summary_log._append(proc.stdout) self.status_message = ( f"Warning: Verify errors detected. Review the log [{self.summary_log_path}]." ) self.exit_code = 2 def _maybe_mail_log(self) -> None: if self.exit_code == 0: return assert self.sdp_env is not None assert self.summary_log_path is not None os.environ["LOGFILE"] = str(self.summary_log_path) subject = f"{self.sdp_env.hostname} {self.sdp_env.p4server} P4Verify Log ({self.status_message})" backup_functions = self.sdp_env.p4cbin / "backup_functions.sh" cmd = ( f"source {shlex.quote(str(backup_functions))} && " f"mail_log_file '{subject}'" ) try: run_shell(cmd, executable="/bin/bash") # type: ignore[arg-type] except Exception: # mailing the log is best-effort pass # ----------------------------------------------------------------- # Error handling # ----------------------------------------------------------------- def _bail(self, message: str, code: int = 1) -> None: sys.stderr.write(f"Error: {message}\n") sys.exit(code) # ===================================================================== # CLI # ===================================================================== def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description=f"p4verify.py v{VERSION} - Perforce verification script", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ DESCRIPTION: This script performs a 'p4 verify' of all submitted and shelved versioned files in depots of all types except 'remote' and 'archive' type depots. If run on a replica, it schedules archive failures for transfer to the replica. EXIT CODES: 0 - No errors were encountered attempting to perform verifications, AND all verifications attempted reported no problems. 1 - Verifications could not be attempted for some reason. 2 - Verifications were successfully performed, but problems such as BAD or MISSING files were detected, or else system limits prevented verification. """, ) parser.add_argument( "instance", nargs="?", default=os.environ.get("SDP_INSTANCE"), help="Specify the SDP instance", ) parser.add_argument( "-P", "--threads", type=int, default=1, help="Specify max number of parallel verify threads (based on 'p4 monitor show')", ) parser.add_argument( "-D", "--debug", action="store_true", help="Set extreme debugging verbosity (not heavily used in this refactor)", ) parser.add_argument( "-s", "--shelves-only", action="store_true", help="Only verify shelved changes, skip depot verification", ) parser.add_argument( "-c", "--min-changelist", type=int, metavar="NUM", help="Only verify files changed after the specified changelist number", ) return parser def main() -> None: parser = build_arg_parser() args = parser.parse_args() if not args.instance or args.instance == "Unset": print("Error: Instance parameter not supplied.", file=sys.stderr) print( "You must supply the Perforce instance as a parameter to this script.", file=sys.stderr, ) sys.exit(1) if args.threads < 1: parser.error("threads must be >= 1") if args.min_changelist is not None and args.min_changelist <= 0: parser.error("min-changelist must be > 0") cfg = VerifyConfig( instance=args.instance, threads=args.threads, debug=args.debug, shelves_only=args.shelves_only, min_changelist=args.min_changelist, ) verifier = P4Verifier(cfg) verifier.run() if __name__ == "__main__": main()