p4verify.py #4

  • //
  • guest/
  • russell_jackson/
  • sdp/
  • Server/
  • Unix/
  • p4/
  • common/
  • bin/
  • p4verify.py
  • View
  • Commits
  • Open Download .zip Download (25 KB)
#!/usr/bin/env python3
"""
p4verify.py - Perforce verification script (refactored)

Performs 'p4 verify' of submitted and shelved versioned files in depots
of all types except 'remote', 'archive', and 'unload' (for depot pass),
plus shelved changelists.
"""

from __future__ import annotations

import argparse
import fcntl
import os
import shlex
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable, List, Optional

VERSION = "7.0.0"


# =====================================================================
# Utilities
# =====================================================================

def run_shell(
    cmd: str,
    *,
    cwd: Optional[str] = None,
    capture: bool = False,
    check: bool = False,
    env: Optional[dict] = None,
) -> subprocess.CompletedProcess:
    """Small helper for shell=True commands."""
    return subprocess.run(
        cmd,
        shell=True,
        cwd=cwd,
        capture_output=capture,
        text=True,
        check=check,
        env=env,
    )


def timestamp() -> str:
    return datetime.now().strftime("%a %Y-%m-%d %H:%M:%S %Z")


# =====================================================================
# Config / state
# =====================================================================

@dataclass
class VerifyConfig:
    instance: str
    threads: int = 1
    debug: bool = False
    shelves_only: bool = False
    min_changelist: Optional[int] = None

    @property
    def revision_spec(self) -> str:
        return f"@>{self.min_changelist}" if self.min_changelist else ""


@dataclass
class SDPEnv:
    p4bin: str
    p4port: str
    p4user: str
    p4cbin: Path
    logs_dir: Path
    server_type: str = ""
    p4replica: str = "FALSE"
    shareddata: str = "FALSE"
    p4server: str = "unknown"
    hostname: str = "unknown"
    p4tmp: Path = Path("/tmp")

    @classmethod
    def from_environment(cls) -> "SDPEnv":
        p4bin = os.environ.get("P4BIN", "p4")
        p4port = os.environ.get("P4PORT")
        p4user = os.environ.get("P4USER")
        p4cbin = os.environ.get("P4CBIN")
        logs = os.environ.get("LOGS")

        if not p4port or not p4user:
            raise RuntimeError("P4PORT or P4USER environment variables not set")
        if not p4cbin:
            raise RuntimeError("P4CBIN environment variable not set")
        if not logs:
            raise RuntimeError("LOGS environment variable not set")

        return cls(
            p4bin=p4bin,
            p4port=p4port,
            p4user=p4user,
            p4cbin=Path(p4cbin),
            logs_dir=Path(logs),
            server_type=os.environ.get("SERVER_TYPE", ""),
            p4replica=os.environ.get("P4REPLICA", "FALSE"),
            shareddata=os.environ.get("SHAREDDATA", "FALSE"),
            p4server=os.environ.get("P4SERVER", "unknown"),
            hostname=os.environ.get("HOSTNAME", "unknown"),
            p4tmp=Path(os.environ.get("P4TMP", "/tmp")),
        )


# =====================================================================
# Logging
# =====================================================================

class FileLogger:
    """Simple file logger with flock-based append, plus optional stdout echo."""

    def __init__(self, path: Path, echo_to_stdout: bool = True) -> None:
        self.path = path
        self.echo_to_stdout = echo_to_stdout

    def log(self, msg: str) -> None:
        line = msg.rstrip("\n") + "\n"
        if self.echo_to_stdout:
            sys.stdout.write(line)
        self._append(line)

    def _append(self, text: str) -> None:
        self.path.parent.mkdir(parents=True, exist_ok=True)
        with open(self.path, "a", encoding="utf-8", errors="replace") as f:
            fcntl.flock(f, fcntl.LOCK_EX)
            f.write(text)
            fcntl.flock(f, fcntl.LOCK_UN)


# =====================================================================
# Perforce wrapper
# =====================================================================

class P4Session:
    """Thin wrapper around 'p4' with convenient helpers."""

    def __init__(self, env: SDPEnv) -> None:
        self.env = env
        self.base_cmd = f"{shlex.quote(env.p4bin)} -p {shlex.quote(env.p4port)} -u {shlex.quote(env.p4user)}"

    # --- core runner -------------------------------------------------

    def run(self, args: Iterable[str], *, capture: bool = True) -> str:
        """Run a p4 command and return stdout (stripped)."""
        if isinstance(args, str):
            cmd_str = f"{self.base_cmd} {args}"
        else:
            cmd_list = [self.base_cmd]
            for a in args:
                cmd_list.append(str(a))
            cmd_str = " ".join(cmd_list)

        proc = run_shell(cmd_str, capture=capture)
        return proc.stdout.strip()

    # --- higher-level helpers ----------------------------------------

    def list_depots(self) -> List[str]:
        """Return depot names excluding remote/archive/unload."""
        out = self.run(["-ztag", "-F", "%name%,%type%", "depots"])
        depots: List[str] = []
        for line in out.splitlines():
            if not line:
                continue
            name, d_type = line.split(",", 1)
            if d_type not in {"remote", "archive", "unload"}:
                depots.append(name)
        return depots

    def unload_depot_name(self) -> Optional[str]:
        out = self.run(["-ztag", "-F", "%name%,%type%", "depots"])
        for line in out.splitlines():
            if not line:
                continue
            name, d_type = line.split(",", 1)
            if d_type == "unload":
                return name
        return None

    def list_dirs(self, depot: str) -> List[str]:
        out = self.run(["-ztag", "-F", "%dir%", "dirs", f"//{depot}/*"])
        return [l for l in out.splitlines() if l]

    def shelved_changes(self) -> List[str]:
        out = self.run(["-ztag", "-F", "%change%", "changes", "-s", "shelved"])
        return [c for c in out.splitlines() if c]

    def monitor_verify_count(self) -> int:
        out = self.run(["monitor", "show", "-a"], capture=True)
        # simple heuristic: count "verify" commands
        return sum(1 for line in out.splitlines() if " verify" in line or line.strip().startswith("verify"))

    def files_in_range(self, pathspec: str) -> int:
        """Return number of files in a filespec by counting lines."""
        out = self.run(["-ztag", "-F", "%depotFile%", "files", pathspec])
        if not out:
            return 0
        return sum(1 for _ in out.splitlines())


# =====================================================================
# Main verifier
# =====================================================================

class P4Verifier:
    def __init__(self, cfg: VerifyConfig) -> None:
        self.cfg = cfg
        self.exit_code = 0
        self.status_message = "OK: All scanned depots verified OK."

        # populated in run()
        self.sdp_env: Optional[SDPEnv] = None
        self.p4: Optional[P4Session] = None
        self.log: Optional[FileLogger] = None
        self.summary_log: Optional[FileLogger] = None
        self.main_log_path: Optional[Path] = None
        self.summary_log_path: Optional[Path] = None

        self.verify_failed = False

    # -----------------------------------------------------------------
    # High-level orchestration
    # -----------------------------------------------------------------

    def run(self) -> None:
        os.environ["SDP_INSTANCE"] = self.cfg.instance

        self._load_sdp_environment()
        self._load_backup_functions()

        self.sdp_env = SDPEnv.from_environment()
        self.p4 = P4Session(self.sdp_env)

        # set up logs + rotation
        self._prepare_logs()

        self.log.log(f"p4verify.py v{VERSION} Starting verify at {timestamp()}.")
        if self.cfg.min_changelist:
            self.log.log(
                f"Running with changelist filter: only verifying files changed after changelist {self.cfg.min_changelist}."
            )

        # p4login
        self._ensure_p4login()

        if self.sdp_env.server_type in ("p4d_edge", "p4d_edgerep"):
            self.log.log(
                f"Exiting because server type is {self.sdp_env.server_type}, "
                "and we do not verify those since they should be running in cache mode."
            )
            sys.exit(0)

        self.log.log("If there are errors in this log, contact your local support team.")

        verify_cmd = self._build_verify_cmd()

        # unload depot first (unless shelves-only)
        if not self.cfg.shelves_only:
            self._verify_unload_depot()

        # shelves
        shelves_log_path = self.sdp_env.logs_dir / "p4verify-shelves.txt"
        self._verify_shelves(verify_cmd, shelves_log_path)

        # depot content
        if not self.cfg.shelves_only:
            self._verify_depots(verify_cmd)

        # merge logs and scan for BAD/MISSING
        self._finalize_logs(shelves_log_path)
        self._scan_for_errors()

        self.log.log(f"Completed verifications at {datetime.now()}.")
        self._maybe_mail_log()
        sys.exit(self.exit_code)

    # -----------------------------------------------------------------
    # SDP / env helpers
    # -----------------------------------------------------------------

    def _load_sdp_environment(self) -> None:
        sdp_env = "/p4/common/bin/p4_vars"
        if not Path(sdp_env).exists():
            self._bail(f"SDP environment file not found: {sdp_env}")

        cmd = f"source {shlex.quote(sdp_env)} {shlex.quote(self.cfg.instance)} && env"
        proc = run_shell(cmd, capture=True, executable="/bin/bash")  # type: ignore[arg-type]
        if proc.returncode != 0:
            self._bail(f"Failed to load SDP environment for instance {self.cfg.instance}.")

        for line in proc.stdout.splitlines():
            if "=" not in line:
                continue
            key, value = line.split("=", 1)
            # Skip bash functions and weird keys
            if value.startswith("() {") or " " in key:
                continue
            os.environ[key] = value

    def _load_backup_functions(self) -> None:
        p4cbin = os.environ.get("P4CBIN")
        if not p4cbin:
            self._bail("P4CBIN environment variable not set")

        backup_functions = Path(p4cbin) / "backup_functions.sh"
        if not backup_functions.exists():
            self._bail(f"backup_functions.sh not found: {backup_functions}")

    # -----------------------------------------------------------------
    # Logging / rotation
    # -----------------------------------------------------------------

    def _prepare_logs(self) -> None:
        assert self.sdp_env is not None
        logs = self.sdp_env.logs_dir

        # main Perforce verify output
        main_log = logs / "verify_output.log"
        summary_log = logs / "p4verify.log"

        # clean old per-depot logs
        for old in logs.glob("p4verify-*"):
            old.unlink()

        # rotate old summary log
        if summary_log.exists():
            backup = summary_log.with_suffix(f".log.{int(time.time())}.gz")
            run_shell(f"gzip -c {shlex.quote(str(summary_log))} > {shlex.quote(str(backup))}")
            summary_log.unlink()

        # delete existing main log
        if main_log.exists():
            main_log.unlink()

        self.main_log_path = main_log
        self.summary_log_path = summary_log
        self.log = FileLogger(main_log, echo_to_stdout=True)
        self.summary_log = FileLogger(summary_log, echo_to_stdout=False)

    # -----------------------------------------------------------------
    # p4login, verify command selection
    # -----------------------------------------------------------------

    def _ensure_p4login(self) -> None:
        assert self.sdp_env is not None
        p4login_path = self.sdp_env.p4cbin / "p4login"
        proc = run_shell(str(p4login_path))
        if proc.returncode != 0:
            self._bail(f"p4login failed with exit code {proc.returncode}")

    def _build_verify_cmd(self) -> str:
        assert self.sdp_env is not None
        p4replica = self.sdp_env.p4replica
        shareddata = self.sdp_env.shareddata

        # Replica vs shareddata controls whether to schedule transfer
        if p4replica == "FALSE" or shareddata == "TRUE":
            return "verify --only MISSING -q"
        return "verify --only MISSING -qt"

    # -----------------------------------------------------------------
    # Unload depot
    # -----------------------------------------------------------------

    def _verify_unload_depot(self) -> None:
        assert self.p4 is not None
        assert self.sdp_env is not None

        unload_name = self.p4.unload_depot_name()
        if not unload_name:
            self.log.log("No unload depot found; skipping unload verification.")
            return

        self.log.log("Starting verify of unload depot.")
        flags = "-qU" if (self.sdp_env.p4replica == "FALSE" or self.sdp_env.shareddata == "TRUE") else "-qUt"
        spec = f"//{unload_name}/...{self.cfg.revision_spec}"
        cmd = f"{self.p4.base_cmd} -s verify {flags} --only MISSING {shlex.quote(spec)} >> {shlex.quote(str(self.main_log_path))} 2>&1"  # type: ignore[arg-type]
        run_shell(cmd)  # fire and forget

    # -----------------------------------------------------------------
    # Shelves
    # -----------------------------------------------------------------

    def _verify_shelves(self, verify_cmd: str, shelves_log: Path) -> None:
        assert self.p4 is not None
        assert self.sdp_env is not None

        # initialize shelves log
        with open(shelves_log, "w", encoding="utf-8", errors="replace") as f:
            f.write("Starting verify of shelves.\n")
            f.write(f"{datetime.now()}\n")

        self.log.log("Starting verify of shelves.")
        changes = self.p4.shelved_changes()
        shelf_temp_logs: List[Path] = []

        for change in changes:
            temp_log = self.sdp_env.p4tmp / f"shelf_{change}.log"
            shelf_temp_logs.append(temp_log)

            # log start
            with open(shelves_log, "a", encoding="utf-8", errors="replace") as f:
                fcntl.flock(f, fcntl.LOCK_EX)
                f.write(f"Verifying shelf {change}\n")
                fcntl.flock(f, fcntl.LOCK_UN)

            cmd = (
                f"{self.p4.base_cmd} {verify_cmd}S @={change} > {shlex.quote(str(temp_log))} 2>&1"
            )
            run_shell(cmd)  # no & here; we throttle via monitor below

            # throttle by monitor count
            self._wait_for_threads(max_threads=self.cfg.threads, sleep_secs=5)

        # wait for any remaining verify threads
        self._wait_for_threads(max_threads=0, sleep_secs=5)

        # merge shelf temp logs
        for temp_log in shelf_temp_logs:
            if not temp_log.exists():
                continue
            with open(temp_log, "r", encoding="utf-8", errors="replace") as src, open(
                shelves_log, "a", encoding="utf-8", errors="replace"
            ) as dst:
                fcntl.flock(dst, fcntl.LOCK_EX)
                dst.write(src.read())
                fcntl.flock(dst, fcntl.LOCK_UN)
            temp_log.unlink()

        with open(shelves_log, "a", encoding="utf-8", errors="replace") as f:
            fcntl.flock(f, fcntl.LOCK_EX)
            f.write("End verify of shelves.\n")
            f.write(f"{datetime.now()}\n")
            fcntl.flock(f, fcntl.LOCK_UN)

    # -----------------------------------------------------------------
    # Depots / depot directories
    # -----------------------------------------------------------------

    def _verify_depots(self, verify_cmd: str) -> None:
        assert self.p4 is not None
        assert self.sdp_env is not None
        assert self.main_log_path is not None

        depots = self.p4.list_depots()
        depot_dirs: List[str] = []

        for depot in depots:
            depot_dirs.extend(self.p4.list_dirs(depot))

        log_base = self.sdp_env.logs_dir / "p4verify"
        for depot_dir in depot_dirs:
            self._verify_single_depot_dir(verify_cmd, depot_dir, log_base)

        self.log.log("Waiting for all verification threads to complete...")
        self._wait_for_threads(max_threads=0, sleep_secs=60)

        # merge per-depot logs
        for depot in depots:
            depot_log_path = log_base.with_name(f"{log_base.name}-{depot}.log")
            if not depot_log_path.exists():
                continue
            with open(depot_log_path, "r", encoding="utf-8", errors="replace") as src, open(
                self.main_log_path, "a", encoding="utf-8", errors="replace"
            ) as dst:
                dst.write(src.read())
            depot_log_path.unlink()

    def _verify_single_depot_dir(self, verify_cmd: str, depot_dir: str, log_base: Path) -> None:
        assert self.p4 is not None
        assert self.sdp_env is not None

        parts = depot_dir.strip("/").split("/")
        depot_name = parts[0] if parts else "unknown"
        cmd_log = log_base.with_name(f"{log_base.name}-{depot_name}.log")

        # header in per-depot log
        with open(cmd_log, "a", encoding="utf-8", errors="replace") as f:
            fcntl.flock(f, fcntl.LOCK_EX)
            f.write(f"Verifying {depot_dir}\n")
            fcntl.flock(f, fcntl.LOCK_UN)

        rev_spec = self.cfg.revision_spec
        spec_files = f"{depot_dir}/*{rev_spec}"
        spec_list = f"{depot_dir}/...{rev_spec}"

        # initial verify of path/* (for speed)
        run_shell(
            f"{self.p4.base_cmd} {verify_cmd} {shlex.quote(spec_files)} >> {shlex.quote(str(cmd_log))} 2>&1",
            cwd=str(self.sdp_env.p4tmp),
        )

        # build temporary file list for chunked verifies
        with tempfile.NamedTemporaryFile(
            mode="w",
            prefix=f"{depot_name}.",
            delete=False,
            dir=self.sdp_env.p4tmp,
        ) as tmp:
            tfile_path = Path(tmp.name)

        run_shell(
            f"{self.p4.base_cmd} -ztag -F %depotFile% files {shlex.quote(spec_list)} > {shlex.quote(str(tfile_path))}",
            cwd=str(self.sdp_env.p4tmp),
        )

        # line count and split size
        try:
            with open(tfile_path, "r", encoding="utf-8", errors="replace") as f:
                line_count = sum(1 for _ in f)
        except OSError:
            line_count = 0

        if 0 <= line_count <= 1_000_000:
            split_lines = 500
        elif 1_000_000 < line_count <= 10_000_000:
            split_lines = 1000
        else:
            split_lines = 3000

        run_shell(
            f"split -l{split_lines} {shlex.quote(str(tfile_path))} {shlex.quote(str(tfile_path))}_",
            cwd=str(self.sdp_env.p4tmp),
        )
        tfile_path.unlink(missing_ok=True)  # py3.8+: emulate manually if needed

        split_files = sorted(self.sdp_env.p4tmp.glob(f"{tfile_path.name}_*"))

        for split_file in split_files:
            self._wait_for_threads(max_threads=self.cfg.threads, sleep_secs=5)

            split_log = split_file.with_suffix(".verify.log")

            with open(cmd_log, "a", encoding="utf-8", errors="replace") as f:
                fcntl.flock(f, fcntl.LOCK_EX)
                f.write(f"Verifying files in {split_file} from {depot_dir}\n")
                fcntl.flock(f, fcntl.LOCK_UN)

            # run in background, cat into depot log, then clean up
            cmd = (
                "("
                f"{self.p4.base_cmd} -s -x {shlex.quote(str(split_file))} {verify_cmd} > {shlex.quote(str(split_log))} 2>&1; "
                f"echo \"=== Completed on $(date)\" >> {shlex.quote(str(split_log))}; "
                f"cat {shlex.quote(str(split_log))} >> {shlex.quote(str(cmd_log))}; "
                f"rm {shlex.quote(str(split_file))} {shlex.quote(str(split_log))}"
                ") &"
            )
            run_shell(cmd, cwd=str(self.sdp_env.p4tmp))

    # -----------------------------------------------------------------
    # Thread throttling based on p4 monitor
    # -----------------------------------------------------------------

    def _wait_for_threads(self, *, max_threads: int, sleep_secs: int) -> None:
        """Block until number of 'verify' monitor entries <= max_threads."""
        assert self.p4 is not None
        while True:
            count = self.p4.monitor_verify_count()
            if count <= max_threads:
                return
            time.sleep(sleep_secs)

    # -----------------------------------------------------------------
    # Finalization / scanning / mail
    # -----------------------------------------------------------------

    def _finalize_logs(self, shelves_log: Path) -> None:
        assert self.main_log_path is not None

        if shelves_log.exists():
            with open(shelves_log, "r", encoding="utf-8", errors="replace") as src, open(
                self.main_log_path, "a", encoding="utf-8", errors="replace"
            ) as dst:
                dst.write(src.read())
            shelves_log.unlink()

    def _scan_for_errors(self) -> None:
        assert self.main_log_path is not None
        assert self.summary_log is not None

        if self.verify_failed:
            self.status_message = f"Error: Verify attempt failed. Review the log [{self.summary_log_path}]."
            self.exit_code = 1

        if self.exit_code != 0:
            return

        # scan for BAD/MISSING/max errors
        proc = run_shell(
            f"egrep '(BAD!|MISSING!|p4 help max)' {shlex.quote(str(self.main_log_path))}",
            capture=True,
        )

        if proc.returncode == 0 and proc.stdout:
            self.summary_log._append(proc.stdout)
            self.status_message = (
                f"Warning: Verify errors detected. Review the log [{self.summary_log_path}]."
            )
            self.exit_code = 2

    def _maybe_mail_log(self) -> None:
        if self.exit_code == 0:
            return

        assert self.sdp_env is not None
        assert self.summary_log_path is not None

        os.environ["LOGFILE"] = str(self.summary_log_path)
        subject = f"{self.sdp_env.hostname} {self.sdp_env.p4server} P4Verify Log ({self.status_message})"
        backup_functions = self.sdp_env.p4cbin / "backup_functions.sh"

        cmd = (
            f"source {shlex.quote(str(backup_functions))} && "
            f"mail_log_file '{subject}'"
        )
        try:
            run_shell(cmd, executable="/bin/bash")  # type: ignore[arg-type]
        except Exception:
            # mailing the log is best-effort
            pass

    # -----------------------------------------------------------------
    # Error handling
    # -----------------------------------------------------------------

    def _bail(self, message: str, code: int = 1) -> None:
        sys.stderr.write(f"Error: {message}\n")
        sys.exit(code)


# =====================================================================
# CLI
# =====================================================================

def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description=f"p4verify.py v{VERSION} - Perforce verification script",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""\
DESCRIPTION:
  This script performs a 'p4 verify' of all submitted and shelved versioned
  files in depots of all types except 'remote' and 'archive' type depots.

  If run on a replica, it schedules archive failures for transfer to the
  replica.

EXIT CODES:
  0 - No errors were encountered attempting to perform verifications,
      AND all verifications attempted reported no problems.

  1 - Verifications could not be attempted for some reason.

  2 - Verifications were successfully performed, but problems such as
      BAD or MISSING files were detected, or else system limits
      prevented verification.
""",
    )

    parser.add_argument(
        "instance",
        nargs="?",
        default=os.environ.get("SDP_INSTANCE"),
        help="Specify the SDP instance",
    )
    parser.add_argument(
        "-P",
        "--threads",
        type=int,
        default=1,
        help="Specify max number of parallel verify threads (based on 'p4 monitor show')",
    )
    parser.add_argument(
        "-D",
        "--debug",
        action="store_true",
        help="Set extreme debugging verbosity (not heavily used in this refactor)",
    )
    parser.add_argument(
        "-s",
        "--shelves-only",
        action="store_true",
        help="Only verify shelved changes, skip depot verification",
    )
    parser.add_argument(
        "-c",
        "--min-changelist",
        type=int,
        metavar="NUM",
        help="Only verify files changed after the specified changelist number",
    )

    return parser


def main() -> None:
    parser = build_arg_parser()
    args = parser.parse_args()

    if not args.instance or args.instance == "Unset":
        print("Error: Instance parameter not supplied.", file=sys.stderr)
        print(
            "You must supply the Perforce instance as a parameter to this script.",
            file=sys.stderr,
        )
        sys.exit(1)

    if args.threads < 1:
        parser.error("threads must be >= 1")

    if args.min_changelist is not None and args.min_changelist <= 0:
        parser.error("min-changelist must be > 0")

    cfg = VerifyConfig(
        instance=args.instance,
        threads=args.threads,
        debug=args.debug,
        shelves_only=args.shelves_only,
        min_changelist=args.min_changelist,
    )

    verifier = P4Verifier(cfg)
    verifier.run()


if __name__ == "__main__":
    main()

# Change User Description Committed
#4 32258 Russell C. Jackson (Rusty) Refactored p4verify.py
#3 32251 Russell C. Jackson (Rusty) Added environment variable for LOGFILE.
#2 32249 Russell C. Jackson (Rusty) Fix utf8 errors.
#1 32238 Russell C. Jackson (Rusty) New p4review.py and a script to create a list of all the files in a large server.