#!/usr/bin/env python3 """ Remux Sony XDCAM-style separate-essence MXF folders: one *V##.MXF (MPEG-2 HD422) plus *A##.MXF mono PCM tracks, into a single MXF with stream copy (no re-encode). Ignores substream (*S##.MXF), proxy/metadata, XML/SMI, etc. Run on a clip folder, or on a parent folder to process every subdirectory that contains a full V+A essence set (output is always next to the split essence files). """ from __future__ import annotations import argparse import os import re import subprocess import sys from pathlib import Path # Track index: one or more digits (V1 / V01 / V001) VIDEO_RE = re.compile(r"^(.+?)V(\d{1,3})\.mxf$", re.IGNORECASE) AUDIO_RE = re.compile(r"^(.+?)A(\d{1,3})\.mxf$", re.IGNORECASE) ClipParts = tuple[str, Path, list[tuple[int, Path]]] def discover(folder: Path) -> tuple[list[tuple[str, int, Path]], list[tuple[str, int, Path]]]: mxfs = sorted(folder.glob("*.[Mm][Xx][Ff]")) videos: list[tuple[str, int, Path]] = [] audios: list[tuple[str, int, Path]] = [] for p in mxfs: vm = VIDEO_RE.match(p.name) am = AUDIO_RE.match(p.name) if vm: videos.append((vm.group(1), int(vm.group(2)), p)) elif am: audios.append((am.group(1), int(am.group(2)), p)) return videos, audios def pick_clip_videos(videos: list[tuple[str, int, Path]]) -> tuple[str, Path]: if not videos: raise ValueError("No video essence file matching *V##.MXF (e.g. C0006V01.MXF).") by_prefix: dict[str, list[tuple[int, Path]]] = {} for prefix, idx, path in videos: by_prefix.setdefault(prefix, []).append((idx, path)) if len(by_prefix) > 1: raise ValueError( "Multiple clip prefixes with video essence: " + ", ".join(sorted(by_prefix)) + ". Put one clip per folder or process folders separately." ) prefix = next(iter(by_prefix.keys())) items = sorted(by_prefix[prefix], key=lambda x: x[0]) if len(items) > 1: raise ValueError( f"Multiple video essence files for prefix {prefix!r}: " + ", ".join(p.name for _, p in items) + ". Expected a single V track per remux." ) return prefix, items[0][1] def resolve_clip_required(folder: Path) -> ClipParts: videos, audios = discover(folder) if not videos: sys.exit(f"No video essence file (*V##.MXF) in {folder}") try: prefix, video_path = pick_clip_videos(videos) except ValueError as e: sys.exit(f"{folder}: {e}") clip_audios = [(idx, p) for pfx, idx, p in audios if pfx == prefix] clip_audios.sort(key=lambda x: x[0]) if not clip_audios: sys.exit(f"No audio essence *A##.MXF for clip prefix {prefix!r} in {folder}.") return prefix, video_path, clip_audios def try_resolve_clip(folder: Path) -> ClipParts | None: videos, audios = discover(folder) if not videos: return None try: prefix, video_path = pick_clip_videos(videos) except ValueError as e: print(f"Skipping {folder}: {e}", file=sys.stderr) return None clip_audios = [(idx, p) for pfx, idx, p in audios if pfx == prefix] clip_audios.sort(key=lambda x: x[0]) if not clip_audios: print( f"Skipping {folder}: no audio essence *A##.MXF for prefix {prefix!r}.", file=sys.stderr, ) return None return prefix, video_path, clip_audios def iter_clip_folders(root: Path, *, follow_symlinks: bool) -> list[tuple[Path, ClipParts]]: found: list[tuple[Path, ClipParts]] = [] for dirpath, _, _ in os.walk(root, followlinks=follow_symlinks): p = Path(dirpath) r = try_resolve_clip(p) if r is not None: found.append((p, r)) return sorted(found, key=lambda x: str(x[0])) def scan_tree(root: Path, *, follow_symlinks: bool) -> None: """Print what is on disk: helps debug 'no clip folders found' (naming, symlinks, layout).""" print(f"Scanning: {root}", file=sys.stderr) print(f"follow_symlinks={follow_symlinks}", file=sys.stderr) dirs_seen = 0 dirs_with_mxf: list[tuple[str, list[str]]] = [] for dirpath, dirnames, filenames in os.walk(root, followlinks=follow_symlinks): dirs_seen += 1 mxf_files = sorted(f for f in filenames if f.lower().endswith(".mxf")) if mxf_files: dirs_with_mxf.append((dirpath, mxf_files)) print(f"Directories visited: {dirs_seen}", file=sys.stderr) print(f"Directories containing .mxf: {len(dirs_with_mxf)}", file=sys.stderr) for d, names in dirs_with_mxf[:200]: v_ok = sum(1 for n in names if VIDEO_RE.match(n)) a_ok = sum(1 for n in names if AUDIO_RE.match(n)) print(f" {d}", file=sys.stderr) print(f" files={len(names)} names_like_*V*#={v_ok} *A*#={a_ok}", file=sys.stderr) if v_ok == 0 or a_ok == 0: print(f" sample: {names[:8]}{'…' if len(names) > 8 else ''}", file=sys.stderr) if len(dirs_with_mxf) > 200: print(f" … {len(dirs_with_mxf) - 200} more directories with .mxf not shown", file=sys.stderr) def build_cmd(parts: ClipParts, out: Path, *, overwrite: bool) -> list[str]: prefix, video_path, clip_audios = parts inputs: list[Path] = [video_path] + [p for _, p in clip_audios] cmd: list[str] = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"] for p in inputs: cmd.extend(["-i", str(p)]) cmd.append("-map") cmd.append("0:v:0") for i in range(1, len(inputs)): cmd.extend(["-map", f"{i}:a:0"]) cmd.extend(["-c", "copy", "-shortest", "-f", "mxf", str(out)]) return cmd def run_remux( folder: Path, parts: ClipParts, output: Path | None, dry_run: bool, *, overwrite: bool, ) -> int: prefix, video_path, clip_audios = parts out = output if output is not None else folder / f"{prefix}_AV.mxf" print("Clip prefix:", prefix) print("Folder:", folder) print("Video:", video_path.name) print("Audio tracks (channel order):", ", ".join(f"A{a:02d}" for a, _ in clip_audios)) print("Output:", out) if not dry_run and out.exists() and not overwrite: print() print(f"Skipping — output already exists: {out}", file=sys.stderr) print("Use --overwrite to replace it.", file=sys.stderr) return 0 cmd = build_cmd(parts, out, overwrite=overwrite) if dry_run: print() print(subprocess.list2cmdline(cmd)) return 0 r = subprocess.run(cmd) return r.returncode def main() -> None: ap = argparse.ArgumentParser( description=( "Remux XDCAM folder (*V## + *A## MXF) to one MXF per clip, codecs copied. " "Give a clip folder, or a parent folder to scan recursively for clip folders." ), ) ap.add_argument( "folder", type=Path, help="Clip folder with essence MXF files, or a parent folder to scan", ) ap.add_argument( "-o", "--output", type=Path, help=( "Output MXF path (only when FOLDER is a single clip folder; " "not used when scanning subfolders)" ), ) ap.add_argument( "-n", "--dry-run", action="store_true", help="Print ffmpeg command(s) and exit", ) ap.add_argument( "--overwrite", action="store_true", help="Replace existing output MXF (default: skip if *_AV.mxf already exists)", ) ap.add_argument( "--no-follow-symlinks", action="store_true", help=( "Do not walk into symlinked directories. By default symlinks are followed " "(needed on some volumes where clip folders point at the real media)." ), ) ap.add_argument( "--scan", action="store_true", help="List directories that contain .mxf and how many match *V* / *A* patterns; then exit", ) args = ap.parse_args() root = args.folder.expanduser().resolve() if not root.is_dir(): sys.exit(f"Not a directory: {root}") follow_symlinks = not args.no_follow_symlinks if args.scan: scan_tree(root, follow_symlinks=follow_symlinks) raise SystemExit(0) if args.output is not None: parts = resolve_clip_required(root) raise SystemExit( run_remux( root, parts, args.output.expanduser().resolve(), args.dry_run, overwrite=args.overwrite, ) ) items = iter_clip_folders(root, follow_symlinks=follow_symlinks) if not items: sys.exit( f"No clip folders found under {root} (need *V#.MXF + matching *A#.MXF in the same folder).\n" f"Re-run with --scan on that path to list .mxf locations and naming; " f"by default symlinked folders are followed (avoid --no-follow-symlinks unless needed)." ) exit_code = 0 for i, (clip_dir, parts) in enumerate(items): if len(items) > 1: print() print("=" * 72) print(f"[{i + 1}/{len(items)}] {clip_dir}") print("=" * 72) rc = run_remux(clip_dir, parts, None, args.dry_run, overwrite=args.overwrite) if rc != 0: exit_code = rc raise SystemExit(exit_code) if __name__ == "__main__": main()