#!/usr/bin/env python3 """ p4_rollback.py -- Roll back a submitted Perforce changelist. Opens the reverse of every file change from a submitted CL into a new pending changelist. Does NOT auto-submit unless --submit is passed, so you can review the staged rollback first. Action handling: add / branch / move/add / import -> p4 delete delete / move/delete -> p4 undelete edit / integrate -> restore content from previous rev via `p4 print -o` (binary-safe) purge -> skipped (no content available) Requires p4 on PATH, an authenticated session, and a workspace that maps the depot paths affected by the target CL. Usage: p4_rollback.py [-n] [--submit] [-d DESCRIPTION] Examples: # Preview without touching anything p4_rollback.py 12345 -n # Stage rollback in a pending CL (default) p4_rollback.py 12345 -d "Revert bad deploy (CL 12345)" # Stage + auto-submit p4_rollback.py 12345 --submit """ import argparse import re import subprocess import sys # ---------- p4 wrappers ------------------------------------------------------- def run_p4(args, input_data=None, check=True): """Run `p4 `; return stdout. Exit on error when check=True.""" cmd = ["p4"] + args try: r = subprocess.run(cmd, capture_output=True, text=True, input=input_data) except FileNotFoundError: sys.exit("error: 'p4' not found on PATH") if check and r.returncode != 0: sys.stderr.write(f"p4 {' '.join(args)} failed (exit {r.returncode}):\n") sys.stderr.write(r.stderr) sys.exit(r.returncode) return r.stdout def parse_ztag(output): """Parse indexed -ztag output into a list of dicts (one per record index).""" records = {} # Only match indexed fields like "... depotFile0 //path", not header fields. pattern = re.compile(r"\.\.\. ([a-zA-Z]+)(\d+) (.*)") for line in output.splitlines(): m = pattern.match(line) if not m: continue key, idx, value = m.group(1), m.group(2), m.group(3) records.setdefault(idx, {})[key] = value return [records[k] for k in sorted(records, key=int)] # ---------- changelist helpers ------------------------------------------------ def check_submitted(cl): """Bail out if the CL is not submitted (e.g. pending, shelved, new).""" out = run_p4(["-ztag", "change", "-o", str(cl)]) for line in out.splitlines(): if line.startswith("... Status "): status = line[len("... Status "):].strip() if status != "submitted": sys.exit(f"error: CL {cl} has status '{status}', not submitted") return sys.exit(f"error: could not determine status of CL {cl}") def describe_changelist(cl): """Return list of {depotFile, rev, action} dicts for a submitted CL.""" out = run_p4(["-ztag", "describe", "-s", str(cl)]) files = [] for r in parse_ztag(out): if {"depotFile", "rev", "action"} <= set(r): files.append({ "depotFile": r["depotFile"], "rev": int(r["rev"]), "action": r["action"], }) return files def create_pending_cl(description): """Create an empty pending changelist with the given description.""" spec = run_p4(["change", "-o"]) out, i, lines = [], 0, spec.splitlines() while i < len(lines): line = lines[i] if line.startswith("Description:"): out.append("Description:") for dline in description.splitlines(): out.append(f"\t{dline}") i += 1 # Skip the original tab-indented description block. while i < len(lines) and lines[i].startswith("\t"): i += 1 continue out.append(line) i += 1 res = run_p4(["change", "-i"], input_data="\n".join(out) + "\n") m = re.search(r"Change (\d+) created", res) if not m: sys.exit(f"error: could not parse new CL number from:\n{res}") return int(m.group(1)) def get_client_file(depot_path): """Resolve a depot path to its local workspace path, or None if unmapped.""" out = run_p4(["-ztag", "where", depot_path], check=False) for line in out.splitlines(): if line.startswith("... path "): return line[len("... path "):].strip() return None # ---------- rollback logic ---------------------------------------------------- def rollback_file(f, new_cl, dry_run): depot = f["depotFile"] rev = f["rev"] action = f["action"] prev = rev - 1 label = f"[{action:>14s}]" def p4(args): if dry_run: print(f" DRY-RUN: p4 {' '.join(args)}") return "" return run_p4(args, check=False) if action in ("add", "branch", "move/add", "import"): # File came into existence in this CL -> delete it. print(f"{label} {depot}#{rev} -> delete") p4(["sync", f"{depot}#{rev}"]) p4(["delete", "-c", str(new_cl), depot]) elif action in ("delete", "move/delete"): # File was deleted in this CL -> undelete from previous rev. if prev < 1: print(f"{label} {depot}#{rev} -> SKIP (no previous revision)") return print(f"{label} {depot}#{rev} -> undelete from #{prev}") p4(["undelete", "-c", str(new_cl), f"{depot}#{prev}"]) elif action in ("edit", "integrate"): # File was modified -> restore previous content. if prev < 1: print(f"{label} {depot}#{rev} -> SKIP (no previous revision)") return client_file = get_client_file(depot) if not client_file: print(f"{label} {depot}#{rev} -> SKIP (not mapped in workspace)") return print(f"{label} {depot}#{rev} -> restore content from #{prev}") p4(["sync", f"{depot}#{rev}"]) p4(["edit", "-c", str(new_cl), depot]) # Overwrite the workspace file with the previous revision's content. # p4 print -o handles binary files correctly. if dry_run: print(f" DRY-RUN: p4 print -q -o {client_file} {depot}#{prev}") else: r = subprocess.run( ["p4", "print", "-q", "-o", client_file, f"{depot}#{prev}"], capture_output=True, text=True, ) if r.returncode != 0: sys.stderr.write(f" warning: p4 print failed for {depot}#{prev}\n") sys.stderr.write(r.stderr) elif action == "purge": print(f"{label} {depot}#{rev} -> SKIP (purged, content unavailable)") else: print(f"{label} {depot}#{rev} -> SKIP (unhandled action '{action}')") # ---------- main -------------------------------------------------------------- def main(): ap = argparse.ArgumentParser( description="Roll back a submitted Perforce changelist.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) ap.add_argument("changelist", type=int, help="Submitted CL to roll back") ap.add_argument("-d", "--description", help="Description for the rollback CL (default: auto-generated)") ap.add_argument("-n", "--dry-run", action="store_true", help="Show what would happen without executing any commands") ap.add_argument("--submit", action="store_true", help="Auto-submit the rollback CL (default: leave pending)") args = ap.parse_args() check_submitted(args.changelist) files = describe_changelist(args.changelist) if not files: sys.exit(f"error: no files found in CL {args.changelist}") desc = args.description or f"Rollback of change {args.changelist}" print(f"CL {args.changelist}: {len(files)} file(s) affected\n") if args.dry_run: print(f"DRY-RUN: would create pending CL with description: {desc!r}\n") new_cl = 0 else: new_cl = create_pending_cl(desc) print(f"Created pending CL {new_cl}\n") for f in files: rollback_file(f, new_cl, args.dry_run) if args.dry_run: print("\nDRY-RUN complete; no changes made.") return print() if args.submit: print(f"Submitting CL {new_cl}...") print(run_p4(["submit", "-c", str(new_cl)])) else: print(f"Rollback staged in pending CL {new_cl}.") print(f" Review: p4 opened -c {new_cl}") print(f" Diff: p4 diff -c {new_cl}") print(f" Submit: p4 submit -c {new_cl}") print(f" Abort: p4 revert -c {new_cl} //... && p4 change -d {new_cl}") if __name__ == "__main__": main()