rollback.py #1

  • //
  • guest/
  • russell_jackson/
  • sdp/
  • Maintenance/
  • rollback.py
  • View
  • Commits
  • Open Download .zip Download (9 KB)
#!/usr/bin/env python3
"""
p4_rollback.py -- Roll back a submitted Perforce changelist.

Opens the reverse of every file change from a submitted CL into a new pending
changelist. Does NOT auto-submit unless --submit is passed, so you can review
the staged rollback first.

Action handling:
    add / branch / move/add / import    -> p4 delete
    delete / move/delete                -> p4 undelete
    edit / integrate                    -> restore content from previous rev
                                           via `p4 print -o` (binary-safe)
    purge                               -> skipped (no content available)

Requires p4 on PATH, an authenticated session, and a workspace that maps the
depot paths affected by the target CL.

Usage:
    p4_rollback.py <CL> [-n] [--submit] [-d DESCRIPTION]

Examples:
    # Preview without touching anything
    p4_rollback.py 12345 -n

    # Stage rollback in a pending CL (default)
    p4_rollback.py 12345 -d "Revert bad deploy (CL 12345)"

    # Stage + auto-submit
    p4_rollback.py 12345 --submit
"""

import argparse
import re
import subprocess
import sys


# ---------- p4 wrappers -------------------------------------------------------

def run_p4(args, input_data=None, check=True):
    """Run `p4 <args>`; return stdout. Exit on error when check=True."""
    cmd = ["p4"] + args
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, input=input_data)
    except FileNotFoundError:
        sys.exit("error: 'p4' not found on PATH")
    if check and r.returncode != 0:
        sys.stderr.write(f"p4 {' '.join(args)} failed (exit {r.returncode}):\n")
        sys.stderr.write(r.stderr)
        sys.exit(r.returncode)
    return r.stdout


def parse_ztag(output):
    """Parse indexed -ztag output into a list of dicts (one per record index)."""
    records = {}
    # Only match indexed fields like "... depotFile0 //path", not header fields.
    pattern = re.compile(r"\.\.\. ([a-zA-Z]+)(\d+) (.*)")
    for line in output.splitlines():
        m = pattern.match(line)
        if not m:
            continue
        key, idx, value = m.group(1), m.group(2), m.group(3)
        records.setdefault(idx, {})[key] = value
    return [records[k] for k in sorted(records, key=int)]


# ---------- changelist helpers ------------------------------------------------

def check_submitted(cl):
    """Bail out if the CL is not submitted (e.g. pending, shelved, new)."""
    out = run_p4(["-ztag", "change", "-o", str(cl)])
    for line in out.splitlines():
        if line.startswith("... Status "):
            status = line[len("... Status "):].strip()
            if status != "submitted":
                sys.exit(f"error: CL {cl} has status '{status}', not submitted")
            return
    sys.exit(f"error: could not determine status of CL {cl}")


def describe_changelist(cl):
    """Return list of {depotFile, rev, action} dicts for a submitted CL."""
    out = run_p4(["-ztag", "describe", "-s", str(cl)])
    files = []
    for r in parse_ztag(out):
        if {"depotFile", "rev", "action"} <= set(r):
            files.append({
                "depotFile": r["depotFile"],
                "rev": int(r["rev"]),
                "action": r["action"],
            })
    return files


def create_pending_cl(description):
    """Create an empty pending changelist with the given description."""
    spec = run_p4(["change", "-o"])
    out, i, lines = [], 0, spec.splitlines()
    while i < len(lines):
        line = lines[i]
        if line.startswith("Description:"):
            out.append("Description:")
            for dline in description.splitlines():
                out.append(f"\t{dline}")
            i += 1
            # Skip the original tab-indented description block.
            while i < len(lines) and lines[i].startswith("\t"):
                i += 1
            continue
        out.append(line)
        i += 1
    res = run_p4(["change", "-i"], input_data="\n".join(out) + "\n")
    m = re.search(r"Change (\d+) created", res)
    if not m:
        sys.exit(f"error: could not parse new CL number from:\n{res}")
    return int(m.group(1))


def get_client_file(depot_path):
    """Resolve a depot path to its local workspace path, or None if unmapped."""
    out = run_p4(["-ztag", "where", depot_path], check=False)
    for line in out.splitlines():
        if line.startswith("... path "):
            return line[len("... path "):].strip()
    return None


# ---------- rollback logic ----------------------------------------------------

def rollback_file(f, new_cl, dry_run):
    depot = f["depotFile"]
    rev = f["rev"]
    action = f["action"]
    prev = rev - 1
    label = f"[{action:>14s}]"

    def p4(args):
        if dry_run:
            print(f"    DRY-RUN: p4 {' '.join(args)}")
            return ""
        return run_p4(args, check=False)

    if action in ("add", "branch", "move/add", "import"):
        # File came into existence in this CL -> delete it.
        print(f"{label} {depot}#{rev}  -> delete")
        p4(["sync", f"{depot}#{rev}"])
        p4(["delete", "-c", str(new_cl), depot])

    elif action in ("delete", "move/delete"):
        # File was deleted in this CL -> undelete from previous rev.
        if prev < 1:
            print(f"{label} {depot}#{rev}  -> SKIP (no previous revision)")
            return
        print(f"{label} {depot}#{rev}  -> undelete from #{prev}")
        p4(["undelete", "-c", str(new_cl), f"{depot}#{prev}"])

    elif action in ("edit", "integrate"):
        # File was modified -> restore previous content.
        if prev < 1:
            print(f"{label} {depot}#{rev}  -> SKIP (no previous revision)")
            return
        client_file = get_client_file(depot)
        if not client_file:
            print(f"{label} {depot}#{rev}  -> SKIP (not mapped in workspace)")
            return
        print(f"{label} {depot}#{rev}  -> restore content from #{prev}")
        p4(["sync", f"{depot}#{rev}"])
        p4(["edit", "-c", str(new_cl), depot])
        # Overwrite the workspace file with the previous revision's content.
        # p4 print -o handles binary files correctly.
        if dry_run:
            print(f"    DRY-RUN: p4 print -q -o {client_file} {depot}#{prev}")
        else:
            r = subprocess.run(
                ["p4", "print", "-q", "-o", client_file, f"{depot}#{prev}"],
                capture_output=True, text=True,
            )
            if r.returncode != 0:
                sys.stderr.write(f"    warning: p4 print failed for {depot}#{prev}\n")
                sys.stderr.write(r.stderr)

    elif action == "purge":
        print(f"{label} {depot}#{rev}  -> SKIP (purged, content unavailable)")
    else:
        print(f"{label} {depot}#{rev}  -> SKIP (unhandled action '{action}')")


# ---------- main --------------------------------------------------------------

def main():
    ap = argparse.ArgumentParser(
        description="Roll back a submitted Perforce changelist.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__,
    )
    ap.add_argument("changelist", type=int, help="Submitted CL to roll back")
    ap.add_argument("-d", "--description",
                    help="Description for the rollback CL (default: auto-generated)")
    ap.add_argument("-n", "--dry-run", action="store_true",
                    help="Show what would happen without executing any commands")
    ap.add_argument("--submit", action="store_true",
                    help="Auto-submit the rollback CL (default: leave pending)")
    args = ap.parse_args()

    check_submitted(args.changelist)

    files = describe_changelist(args.changelist)
    if not files:
        sys.exit(f"error: no files found in CL {args.changelist}")

    desc = args.description or f"Rollback of change {args.changelist}"
    print(f"CL {args.changelist}: {len(files)} file(s) affected\n")

    if args.dry_run:
        print(f"DRY-RUN: would create pending CL with description: {desc!r}\n")
        new_cl = 0
    else:
        new_cl = create_pending_cl(desc)
        print(f"Created pending CL {new_cl}\n")

    for f in files:
        rollback_file(f, new_cl, args.dry_run)

    if args.dry_run:
        print("\nDRY-RUN complete; no changes made.")
        return

    print()
    if args.submit:
        print(f"Submitting CL {new_cl}...")
        print(run_p4(["submit", "-c", str(new_cl)]))
    else:
        print(f"Rollback staged in pending CL {new_cl}.")
        print(f"  Review:  p4 opened -c {new_cl}")
        print(f"  Diff:    p4 diff -c {new_cl}")
        print(f"  Submit:  p4 submit -c {new_cl}")
        print(f"  Abort:   p4 revert -c {new_cl} //... && p4 change -d {new_cl}")


if __name__ == "__main__":
    main()
# Change User Description Committed
#1 32585 Russell C. Jackson (Rusty) CLI script to do a rollback of a CL.