""" Generate a view that unambiguously combines multiple Perforce depot paths into a single client/stream path. For example, given the depot files: //depot/lib_A/src/A1/A11.c //depot/lib_B/src/B1/B12.c //depot/lib_B/src/B2/B21.c //depot/lib_C/src/C1/C11.c //depot/lib_C/src/C1/C12.c //depot/lib_C/src/C3/C31.c and the script invocation: combine_views.py //depot/lib_A/... //depot/lib_B/... //depot/lib_C/... the combined view is: //depot/lib_A/src/A1/... src/A1/... //depot/lib_B/src/B1/... src/B1/... //depot/lib_B/src/B2/... src/B2/... //depot/lib_C/src/C1/... src/C1/... //depot/lib_C/src/C3/... src/C3/... The --client option formats the output to make it suitable to paste directly into a Perforce client view. The --stream option formats it as "import" paths within a stream. This script is provided as-is; all warranties are hereby disclaimed. Note that the only command executed by this script is "p4 files". """ from argparse import ArgumentParser from dataclasses import dataclass from subprocess import run, PIPE from typing import cast, Literal # To avoid a dependency on P4Python we'll just build our own # limited version of MapApi that's adequate to this use case. Wildcard = Literal["*", "...", ""] # no wildcard is okay too WILDCARDS: set[Wildcard] = {"*", "..."} def check_wild(path: str) -> None: if any(w in path for w in WILDCARDS): raise ValueError("No embedded wildcards; try P4.MapApi") @dataclass(frozen=True) class MapHalf: prefix: str wild: Wildcard def __post_init__(self): check_wild(self.prefix) def match_wild(self, path: str) -> str | None: """Return the part of the path that matches the wildcard.""" check_wild(path) if not path.startswith(self.prefix): return None m = path[len(self.prefix):] if ( self.wild == "..." or self.wild == "" and m == "" or self.wild == "*" and "/" not in m ): return m else: return None def match_wild_strict(self, path: str) -> str: match = self.match_wild(path) if match is None: raise ValueError("Expected a match") return match @staticmethod def from_str(path: str) -> 'MapHalf': wild = next((w for w in WILDCARDS if path.endswith(w)), "") return MapHalf( path[:-len(wild)] if wild else path, cast(Wildcard, wild) ) def __str__(self) -> str: s = self.prefix + self.wild return f'"{s}"' if any(c.isspace() for c in s) else s @dataclass(frozen=True) class MapItem: lhs: MapHalf rhs: MapHalf def __post_init__(self): if self.lhs.wild != self.rhs.wild: raise ValueError("Incompatible wildcards in LHS/RHS") def translate(self, lhs_file: str) -> str | None: """Translate from LHS to RHS.""" match = self.lhs.match_wild(lhs_file) if match is None: return None return self.rhs.prefix + match def descend(self, child_map: MapHalf) -> 'MapItem': """Narrow a map to apply to a relative subdir.""" if self.lhs.wild == "*" and "/" in child_map.prefix: raise ValueError("Can't join / with *") if not self.lhs.wild: raise ValueError("Can't join with a non-wild path") return MapItem( MapHalf(self.lhs.prefix + child_map.prefix, child_map.wild), MapHalf(self.rhs.prefix + child_map.prefix, child_map.wild) ) def __str__(self) -> str: return f"{self.lhs}\t{self.rhs}" @dataclass(frozen=True) class MapBucket: lhs_paths: tuple[str, ...] map: MapItem def __post_init__(self): rhs_paths = [self.map.translate(p) for p in self.lhs_paths] if any(p is None for p in rhs_paths): raise ValueError("Not all bucket paths match the map") def __str__(self): return f"{self.map} ({len(self.lhs_paths)} files)" def disambiguate(buckets: list[MapBucket]) -> list[MapBucket]: """Turn a set of overlapping buckets into non-overlapping buckets. We assume that all of the buckets have the same RHS mapping, and that all of the LHS files translate to unique RHS files, otherwise we'll raise a ValueError. """ if not buckets: raise ValueError("Can't disambiguate an empty set...") if len(buckets) == 1: return buckets # no ambiguity! if len(set(b.map.rhs for b in buckets)) != 1: raise ValueError("Not all buckets have the same RHS") wild = buckets[0].map.rhs.wild if wild == "": raise ValueError("Can't disambiguate file-level collisions") # Combine all the buckets on the RHS side, get the matches rhs_matches = [ b.map.lhs.match_wild_strict(p) for b in buckets for p in b.lhs_paths ] # Now build relative MapHalves representing child dirs children: set[MapHalf] = set() for m in rhs_matches: if wild == "*": # split bucket into single files children.add(MapHalf(m, "")) else: # thisdir/... prefix, slash, match = m.partition("/") if match: # thisdir/child/... children.add(MapHalf(prefix + slash, "...")) else: # thisdir/* children.add(MapHalf("", "*")) disambiguated: list[MapBucket] = [] for child in children: child_buckets: list[MapBucket] = [] for b in buckets: child_map = b.map.descend(child) lhs_paths = [ p for p in b.lhs_paths if child_map.translate(p) is not None ] if lhs_paths: child_buckets.append(MapBucket(tuple(lhs_paths), child_map)) disambiguated.extend(disambiguate(child_buckets)) disambiguated.sort(key=str) return disambiguated def check_collisions(buckets: list[MapBucket]) -> list[list[str]]: """Check for file-level collisions in a list of buckets, grouping the collisions for easy reporting.""" rhs_to_lhs: dict[str, list[str]] = {} for b in buckets: for lhs in b.lhs_paths: rhs = b.map.translate(lhs) assert rhs, "Bucket's paths don't match its map" rhs_to_lhs.setdefault(rhs, []).append(lhs) return [ lhs_paths for lhs_paths in rhs_to_lhs.values() if len(lhs_paths) > 1 ] def map_path(path: str) -> MapItem: """Build a MapItem for a path, simplifying away embedded wildcards. This is a convenience to let us build a filtered file list with a complex wildcard expression while still doing a tree-based map. """ if (sum(path.count(wild) for wild in WILDCARDS)) > 1: for wild in WILDCARDS: if wild in path: path = path[:path.index(wild)] path += "..." lhs = MapHalf.from_str(path) return MapItem(lhs, MapHalf("", lhs.wild)) def main(): parser = ArgumentParser(description= """Generate a view that unambiguously combines multiple depot paths into a single client path.""" ) parser.add_argument('paths', nargs='+', help="A list of depot paths to combine into one view.") format_opts = parser.add_mutually_exclusive_group() format_opts.add_argument('-c', '--client', help="Format the view as for a client with this name.") format_opts.add_argument('-S', '--stream', action='store_true') args = parser.parse_args() client = cast(str|None, args.client) paths = cast(list[str], args.paths) stream = cast(bool, args.stream) buckets = [ MapBucket( run( # type: ignore ["p4", r"-F%depotFile%", "files", "-e", path], stdout=PIPE, text=True ).stdout.splitlines(), map_path(path) ) for path in paths ] # Sanity check the input buckets. empty_paths = [str(b.map.lhs) for b in buckets if not b.lhs_paths] if empty_paths: print("No files in path:", *empty_paths) return collisions = check_collisions(buckets) if collisions: print("File collisions detected:") for c in collisions: print("=====") print(*c, sep='\n') print("=====") print("Delete all but one file in each set and try again.") return # Do the disambiguation and pretty-print a view. buckets = disambiguate(buckets) client_map = f"//{client}/" if client else "" col = max(len(str(b.map.rhs if stream else b.map.lhs)) for b in buckets) for b in buckets: if stream: print(f"\timport {str(b.map.rhs).ljust(col)}\t{b.map.lhs}") else: print(f"\t{str(b.map.lhs).ljust(col)}\t{client_map}{b.map.rhs}") if __name__ == '__main__': main() # Canned test cases suitable for running with pytest. def clean_test_blob(test_blob: str) -> list[str]: """Turn a messy multiline string into a nice list.""" return [ line.strip() for line in test_blob.splitlines() if line and not line.isspace() ] def make_test_bucket(test_case: str) -> MapBucket: """Make a bucket from a blob o' test data.""" path, *files = clean_test_blob(test_case) return MapBucket(tuple(files), map_path(path)) def compare_test_view(buckets: list[MapBucket], test_data: str) -> None: """Compare view represented by bucket list to test blob.""" print(*buckets, sep="\n") print("...") print(*clean_test_blob(test_data), sep="\n") for bucket, line in zip(buckets, clean_test_blob(test_data)): test_map = MapItem( *map(MapHalf.from_str, map(str.strip, line.split())) ) assert bucket.map == test_map, f"{bucket.map} != {test_map}" def do_disambiguate_test(test_case: list[str], test_data: str) -> None: """Run a test case.""" compare_test_view( disambiguate([make_test_bucket(b) for b in test_case]), test_data ) def test_slide_1() -> None: do_disambiguate_test([ """ //depot/lib_A/... //depot/lib_A/src/A1/A11.c //depot/lib_A/src/A1/A12.c //depot/lib_A/src/A1/A13.c //depot/lib_A/src/A2/A21.c //depot/lib_A/src/A3/A31.c //depot/lib_A/include/A.h """, """ //depot/lib_B/... //depot/lib_B/src/B1/B11.c //depot/lib_B/src/B1/B12.c //depot/lib_B/src/B1/B13.c //depot/lib_B/src/B2/B21.c //depot/lib_B/src/B3/B31.c //depot/lib_B/include/B.h """, """ //depot/lib_C/... //depot/lib_C/src/C1/C11.c //depot/lib_C/src/C1/C12.c //depot/lib_C/src/C1/C13.c //depot/lib_C/src/C2/C21.c //depot/lib_C/src/C3/C31.c //depot/lib_C/include/C.h """, ], """ //depot/lib_A/include/A.h include/A.h //depot/lib_A/src/A1/... src/A1/... //depot/lib_A/src/A2/... src/A2/... //depot/lib_A/src/A3/... src/A3/... //depot/lib_B/include/B.h include/B.h //depot/lib_B/src/B1/... src/B1/... //depot/lib_B/src/B2/... src/B2/... //depot/lib_B/src/B3/... src/B3/... //depot/lib_C/include/C.h include/C.h //depot/lib_C/src/C1/... src/C1/... //depot/lib_C/src/C2/... src/C2/... //depot/lib_C/src/C3/... src/C3/... """ ) def test_slide_2() -> None: do_disambiguate_test([ """ //depot/Flat/Apple/dev/.../_projects... //depot/Flat/Apple/dev/src/A1/_projects/Make.apple """, """ //depot/Flat/Linux/dev/.../_projects... //depot/Flat/Linux/dev/src/A1/_projects/Make.linux """, """ //depot/Flat/Win/dev/.../_projects... //depot/Flat/Win/dev/src/A1/_projects/Make.win """, """ //depot/lib_A/... //depot/lib_A/include/A.c //depot/lib_A/src/A1/A11.c //depot/lib_A/src/A1/A12.c //depot/lib_A/src/A1/A13.c //depot/lib_A/src/A2/A2.c //depot/lib_A/src/A3/A3.c """, ], """ //depot/Flat/Apple/dev/src/A1/_projects/Make.apple src/A1/_projects/Make.apple //depot/Flat/Linux/dev/src/A1/_projects/Make.linux src/A1/_projects/Make.linux //depot/Flat/Win/dev/src/A1/_projects/Make.win src/A1/_projects/Make.win //depot/lib_A/include/... include/... //depot/lib_A/src/A1/* src/A1/* //depot/lib_A/src/A2/... src/A2/... //depot/lib_A/src/A3/... src/A3/... """ )