#!/usr/bin/env python3
"""
p4_rollback.py -- Roll back a submitted Perforce changelist.
Opens the reverse of every file change from a submitted CL into a new pending
changelist. Does NOT auto-submit unless --submit is passed, so you can review
the staged rollback first.
Action handling:
add / branch / move/add / import -> p4 delete
delete / move/delete -> p4 undelete
edit / integrate -> restore content from previous rev
via `p4 print -o` (binary-safe)
purge -> skipped (no content available)
Requires p4 on PATH, an authenticated session, and a workspace that maps the
depot paths affected by the target CL.
Usage:
p4_rollback.py <CL> [-n] [--submit] [-d DESCRIPTION]
Examples:
# Preview without touching anything
p4_rollback.py 12345 -n
# Stage rollback in a pending CL (default)
p4_rollback.py 12345 -d "Revert bad deploy (CL 12345)"
# Stage + auto-submit
p4_rollback.py 12345 --submit
"""
import argparse
import re
import subprocess
import sys
# ---------- p4 wrappers -------------------------------------------------------
def run_p4(args, input_data=None, check=True):
"""Run `p4 <args>`; return stdout. Exit on error when check=True."""
cmd = ["p4"] + args
try:
r = subprocess.run(cmd, capture_output=True, text=True, input=input_data)
except FileNotFoundError:
sys.exit("error: 'p4' not found on PATH")
if check and r.returncode != 0:
sys.stderr.write(f"p4 {' '.join(args)} failed (exit {r.returncode}):\n")
sys.stderr.write(r.stderr)
sys.exit(r.returncode)
return r.stdout
def parse_ztag(output):
"""Parse indexed -ztag output into a list of dicts (one per record index)."""
records = {}
# Only match indexed fields like "... depotFile0 //path", not header fields.
pattern = re.compile(r"\.\.\. ([a-zA-Z]+)(\d+) (.*)")
for line in output.splitlines():
m = pattern.match(line)
if not m:
continue
key, idx, value = m.group(1), m.group(2), m.group(3)
records.setdefault(idx, {})[key] = value
return [records[k] for k in sorted(records, key=int)]
# ---------- changelist helpers ------------------------------------------------
def check_submitted(cl):
"""Bail out if the CL is not submitted (e.g. pending, shelved, new)."""
out = run_p4(["-ztag", "change", "-o", str(cl)])
for line in out.splitlines():
if line.startswith("... Status "):
status = line[len("... Status "):].strip()
if status != "submitted":
sys.exit(f"error: CL {cl} has status '{status}', not submitted")
return
sys.exit(f"error: could not determine status of CL {cl}")
def describe_changelist(cl):
"""Return list of {depotFile, rev, action} dicts for a submitted CL."""
out = run_p4(["-ztag", "describe", "-s", str(cl)])
files = []
for r in parse_ztag(out):
if {"depotFile", "rev", "action"} <= set(r):
files.append({
"depotFile": r["depotFile"],
"rev": int(r["rev"]),
"action": r["action"],
})
return files
def create_pending_cl(description):
"""Create an empty pending changelist with the given description."""
spec = run_p4(["change", "-o"])
out, i, lines = [], 0, spec.splitlines()
while i < len(lines):
line = lines[i]
if line.startswith("Description:"):
out.append("Description:")
for dline in description.splitlines():
out.append(f"\t{dline}")
i += 1
# Skip the original tab-indented description block.
while i < len(lines) and lines[i].startswith("\t"):
i += 1
continue
out.append(line)
i += 1
res = run_p4(["change", "-i"], input_data="\n".join(out) + "\n")
m = re.search(r"Change (\d+) created", res)
if not m:
sys.exit(f"error: could not parse new CL number from:\n{res}")
return int(m.group(1))
def get_client_file(depot_path):
"""Resolve a depot path to its local workspace path, or None if unmapped."""
out = run_p4(["-ztag", "where", depot_path], check=False)
for line in out.splitlines():
if line.startswith("... path "):
return line[len("... path "):].strip()
return None
# ---------- rollback logic ----------------------------------------------------
def rollback_file(f, new_cl, dry_run):
depot = f["depotFile"]
rev = f["rev"]
action = f["action"]
prev = rev - 1
label = f"[{action:>14s}]"
def p4(args):
if dry_run:
print(f" DRY-RUN: p4 {' '.join(args)}")
return ""
return run_p4(args, check=False)
if action in ("add", "branch", "move/add", "import"):
# File came into existence in this CL -> delete it.
print(f"{label} {depot}#{rev} -> delete")
p4(["sync", f"{depot}#{rev}"])
p4(["delete", "-c", str(new_cl), depot])
elif action in ("delete", "move/delete"):
# File was deleted in this CL -> undelete from previous rev.
if prev < 1:
print(f"{label} {depot}#{rev} -> SKIP (no previous revision)")
return
print(f"{label} {depot}#{rev} -> undelete from #{prev}")
p4(["undelete", "-c", str(new_cl), f"{depot}#{prev}"])
elif action in ("edit", "integrate"):
# File was modified -> restore previous content.
if prev < 1:
print(f"{label} {depot}#{rev} -> SKIP (no previous revision)")
return
client_file = get_client_file(depot)
if not client_file:
print(f"{label} {depot}#{rev} -> SKIP (not mapped in workspace)")
return
print(f"{label} {depot}#{rev} -> restore content from #{prev}")
p4(["sync", f"{depot}#{rev}"])
p4(["edit", "-c", str(new_cl), depot])
# Overwrite the workspace file with the previous revision's content.
# p4 print -o handles binary files correctly.
if dry_run:
print(f" DRY-RUN: p4 print -q -o {client_file} {depot}#{prev}")
else:
r = subprocess.run(
["p4", "print", "-q", "-o", client_file, f"{depot}#{prev}"],
capture_output=True, text=True,
)
if r.returncode != 0:
sys.stderr.write(f" warning: p4 print failed for {depot}#{prev}\n")
sys.stderr.write(r.stderr)
elif action == "purge":
print(f"{label} {depot}#{rev} -> SKIP (purged, content unavailable)")
else:
print(f"{label} {depot}#{rev} -> SKIP (unhandled action '{action}')")
# ---------- main --------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(
description="Roll back a submitted Perforce changelist.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
ap.add_argument("changelist", type=int, help="Submitted CL to roll back")
ap.add_argument("-d", "--description",
help="Description for the rollback CL (default: auto-generated)")
ap.add_argument("-n", "--dry-run", action="store_true",
help="Show what would happen without executing any commands")
ap.add_argument("--submit", action="store_true",
help="Auto-submit the rollback CL (default: leave pending)")
args = ap.parse_args()
check_submitted(args.changelist)
files = describe_changelist(args.changelist)
if not files:
sys.exit(f"error: no files found in CL {args.changelist}")
desc = args.description or f"Rollback of change {args.changelist}"
print(f"CL {args.changelist}: {len(files)} file(s) affected\n")
if args.dry_run:
print(f"DRY-RUN: would create pending CL with description: {desc!r}\n")
new_cl = 0
else:
new_cl = create_pending_cl(desc)
print(f"Created pending CL {new_cl}\n")
for f in files:
rollback_file(f, new_cl, args.dry_run)
if args.dry_run:
print("\nDRY-RUN complete; no changes made.")
return
print()
if args.submit:
print(f"Submitting CL {new_cl}...")
print(run_p4(["submit", "-c", str(new_cl)]))
else:
print(f"Rollback staged in pending CL {new_cl}.")
print(f" Review: p4 opened -c {new_cl}")
print(f" Diff: p4 diff -c {new_cl}")
print(f" Submit: p4 submit -c {new_cl}")
print(f" Abort: p4 revert -c {new_cl} //... && p4 change -d {new_cl}")
if __name__ == "__main__":
main()