import os import re from collections import Counter from dataclasses import dataclass from enum import Enum from typing import Literal, Sequence from app.utils.config import server_config from app.utils.logger import logger from app.utils.plex_client import plex_client from merge3 import Merge3 TEST_PLAYLIST_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "test_playlists") ) class ConflictResolutionStrategy(str, Enum): LOCAL_PRIORITY = "local_priority" REMOTE_PRIORITY = "remote_priority" DELETE_PRIORITY = "delete_priority" KEEP_PRIORITY = "keep_priority" class SyncMode(str, Enum): LOCAL_FORCE = "local_force" REMOTE_FORCE = "remote_force" MERGE_LOCAL_PRIMARY = "merge_local_primary" MERGE_REMOTE_PRIMARY = "merge_remote_primary" @dataclass class PlaylistSyncResult: name: str merged_paths: list[str] conflicts: list[dict] action: Literal["synced", "deleted"] output_dir: str @dataclass class CompiledRegexRules: """Holds compiled regex rules for all four processing stages.""" local_pre: list[tuple[re.Pattern[str], str]] local_post: list[tuple[re.Pattern[str], str]] remote_pre: list[tuple[re.Pattern[str], str]] remote_post: list[tuple[re.Pattern[str], str]] def load_paths(text: str) -> list[str]: """Normalize playlist text into a list of absolute paths. * Remove blank lines. * Ignore comment or metadata lines starting with ``#``. * Preserve order and allow duplicates. """ output: list[str] = [] for line in text.splitlines(): line = line.strip() if not line: continue if line.startswith("#"): continue output.append(line) return output def playlist_to_text(paths: Sequence[str]) -> str: """Convert list of paths into diff3-friendly text.""" return "\n".join(paths) + "\n" def save_paths(paths: Sequence[str]) -> str: """Format playlist paths into m3u/m3u8 text.""" return "#EXTM3U\n" + "\n".join(paths) + "\n" def _compile_regex_rules(rules: Sequence[dict[str, str]]) -> list[tuple[re.Pattern[str], str]]: """Compile regex rules into pattern/replacement pairs. Supports both legacy format (pattern/replacement) and new format (search/replace). """ compiled: list[tuple[re.Pattern[str], str]] = [] for rule in rules: # Support both legacy (pattern/replacement) and new (search/replace) field names # Use explicit None checks to allow empty strings as valid values pattern = rule.get("pattern") if rule.get("pattern") is not None else rule.get("search") if not pattern: continue # For replacement, empty string is a valid value (for deletion) replacement = rule.get("replacement") if rule.get("replacement") is not None else rule.get("replace") if replacement is None: replacement = "" try: compiled.append((re.compile(pattern), replacement)) except re.error as exc: logger.warning(f"Skipping invalid regex '{pattern}': {exc}") return compiled def _apply_compiled_rules_to_paths( paths: Sequence[str], compiled_rules: Sequence[tuple[re.Pattern[str], str]] ) -> list[str]: if not compiled_rules: return list(paths) updated: list[str] = [] for path in paths: new_path = path for pattern, replacement in compiled_rules: new_path = pattern.sub(replacement, new_path) updated.append(new_path) return updated def apply_regex_rules_to_paths( paths: Sequence[str], rules: Sequence[dict[str, str]] ) -> list[str]: """Apply regex replacement rules to each path in order.""" compiled_rules = _compile_regex_rules(rules) return _apply_compiled_rules_to_paths(paths, compiled_rules) def preprocess_playlist_text( text: str, rules: Sequence[dict[str, str]], compiled_rules: Sequence[tuple[re.Pattern[str], str]] | None = None, ) -> str: """Normalize playlist text and apply regex replacements.""" if not text: text = "#EXTM3U\n" paths = load_paths(text) compiled = compiled_rules if compiled_rules is not None else _compile_regex_rules(rules) replaced_paths = _apply_compiled_rules_to_paths(paths, compiled) return save_paths(replaced_paths) @dataclass class MergeChunk: type: Literal["normal", "conflict"] text: str | None = None base_text: str | None = None local_text: str | None = None remote_text: str | None = None origin: str | None = None @dataclass class MergeResult: merged_paths: list[str] conflicts: list[dict] def _ensure_test_dir(folder: str = TEST_PLAYLIST_DIR) -> str: os.makedirs(folder, exist_ok=True) return folder def _read_text_if_exists(path: str) -> tuple[str, bool]: try: with open(path, "r", encoding="utf-8") as file: return file.read(), True except FileNotFoundError: return "", False except OSError: return "", False def _save_playlist_to_folder(filename: str, paths: Sequence[str], folder: str) -> str: _ensure_test_dir(folder) file_path = os.path.join(folder, filename) logger.info(f"Saving playlist to: {file_path}") new_content = save_paths(paths) # Check if content has changed before writing to avoid triggering unnecessary file events if os.path.exists(file_path): try: with open(file_path, "r", encoding="utf-8") as f: current_content = f.read() if current_content == new_content: return file_path except OSError: pass with open(file_path, "w", encoding="utf-8") as file: file.write(new_content) return file_path def _normalize_inputs( base_text: str, local_text: str, remote_text: str, folder: str ) -> tuple[list[str], list[str], list[str]]: """Normalize playlist inputs and persist snapshots for inspection.""" base_paths = load_paths(base_text) local_paths = load_paths(local_text) remote_paths = load_paths(remote_text) _save_playlist_to_folder("base_playlist.m3u8", base_paths, folder) _save_playlist_to_folder("local_input.m3u8", local_paths, folder) _save_playlist_to_folder("remote_input.m3u8", remote_paths, folder) return base_paths, local_paths, remote_paths def _merge_chunks( base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str] ) -> list[MergeChunk]: merger = Merge3(base_paths, local_paths, remote_paths) raw_chunks = list(merger.merge_groups()) chunks: list[MergeChunk] = [] for kind, *payload in raw_chunks: if kind == "conflict": base_lines, local_lines, remote_lines = payload # type: ignore[misc] # Absorb trailing side-only chunks that actually belong to this # conflicting region (common in reorder scenarios). while chunks and chunks[-1].origin in ("a", "b"): previous = chunks.pop() previous_lines = previous.text.splitlines() if previous.text else [] if previous.origin == "a": local_lines = previous_lines + local_lines else: remote_lines = previous_lines + remote_lines chunks.append( MergeChunk( type="conflict", base_text=playlist_to_text(base_lines), local_text=playlist_to_text(local_lines), remote_text=playlist_to_text(remote_lines), ) ) else: lines = payload[0] if not lines: continue chunk = MergeChunk(type="normal", text=playlist_to_text(lines), origin=kind) chunks.append(chunk) return chunks def _write_results( merged_lines: Sequence[str], folder: str, compiled_rules: CompiledRegexRules | None = None ) -> None: """Write sync results to the test folder. If compiled_rules is provided with post-processing rules: - local_result.m3u8: merged_lines processed with local_post rules - remote_result.m3u8: merged_lines processed with remote_post rules - base_next.m3u8: unprocessed merged_lines (normalized sync result) """ # Apply post-processing regex rules if provided if compiled_rules and compiled_rules.local_post: local_lines = _apply_compiled_rules_to_paths(merged_lines, compiled_rules.local_post) else: local_lines = list(merged_lines) if compiled_rules and compiled_rules.remote_post: remote_lines = _apply_compiled_rules_to_paths(merged_lines, compiled_rules.remote_post) else: remote_lines = list(merged_lines) _save_playlist_to_folder("local_result.m3u8", local_lines, folder) _save_playlist_to_folder("remote_result.m3u8", remote_lines, folder) _save_playlist_to_folder("base_next.m3u8", merged_lines, folder) def _write_delete_marker(playlist: str, folder: str) -> str: _ensure_test_dir(folder) marker_path = os.path.join(folder, "delete.txt") with open(marker_path, "w", encoding="utf-8") as file: file.write(f"delete playlist {playlist}") return marker_path def _resolve_conflict(conflict: dict, strategy: ConflictResolutionStrategy) -> list[str]: local_lines = conflict["local"] remote_lines = conflict["remote"] if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY: return local_lines if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY: return remote_lines if strategy == ConflictResolutionStrategy.DELETE_PRIORITY: if not local_lines or not remote_lines: return [] return local_lines if strategy == ConflictResolutionStrategy.KEEP_PRIORITY: if local_lines: return local_lines if remote_lines: return remote_lines return [] raise ValueError(f"Unsupported conflict resolution strategy: {strategy}") def _postprocess_local_priority_merge( merged_lines: list[str], base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str], ) -> list[str]: """Fix local-priority edge cases without disturbing existing order.""" base_counter = Counter(base_paths) local_counter = Counter(local_paths) remote_counter = Counter(remote_paths) merged_counter = Counter(merged_lines) merged = list(merged_lines) # 1) Preserve tracks that only Remote added (not in Base or Local). for track, remote_count in remote_counter.items(): if base_counter[track] == 0 and local_counter[track] == 0 and remote_count > 0: current = merged_counter.get(track, 0) while current < remote_count: merged.append(track) merged_counter[track] += 1 current += 1 # 2) For tracks only new in Local+Remote (not in Base), keep Local's count. candidate_tracks = set(local_counter.keys()) | set(remote_counter.keys()) for track in candidate_tracks: if base_counter[track] == 0 and local_counter[track] > 0 and remote_counter[track] > 0: target = local_counter[track] current = merged_counter.get(track, 0) if current > target: remove_needed = current - target trimmed: list[str] = [] for path in reversed(merged): if remove_needed and path == track: remove_needed -= 1 continue trimmed.append(path) trimmed.reverse() merged = trimmed merged_counter = Counter(merged) current = merged_counter.get(track, 0) while current < target: merged.append(track) current += 1 merged_counter[track] = current return merged def _postprocess_remote_priority_merge( merged_lines: list[str], base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str], ) -> list[str]: base_counter = Counter(base_paths) local_counter = Counter(local_paths) remote_counter = Counter(remote_paths) remote_base_tracks: list[str] = [ track for track in remote_paths if base_counter[track] > 0 ] target_new_counter: dict[str, int] = {} all_tracks = set(local_counter.keys()) | set(remote_counter.keys()) for track in all_tracks: if base_counter[track] > 0: continue r_cnt = remote_counter.get(track, 0) l_cnt = local_counter.get(track, 0) if r_cnt > 0: target_new_counter[track] = r_cnt elif l_cnt > 0: target_new_counter[track] = l_cnt new_tracks: list[str] = [] used_counter: Counter[str] = Counter() for track in remote_paths: if base_counter[track] > 0: continue if track not in target_new_counter: continue if used_counter[track] >= target_new_counter[track]: continue new_tracks.append(track) used_counter[track] += 1 for track in local_paths: if base_counter[track] > 0: continue if track not in target_new_counter: continue if used_counter[track] >= target_new_counter[track]: continue new_tracks.append(track) used_counter[track] += 1 return remote_base_tracks + new_tracks def merge_playlists( base_text: str, local_text: str, remote_text: str, strategy: ConflictResolutionStrategy = ConflictResolutionStrategy.LOCAL_PRIORITY, test_folder: str = TEST_PLAYLIST_DIR, compiled_rules: CompiledRegexRules | None = None, ) -> MergeResult: """Merge playlists using diff3 and resolve conflicts per strategy. The base, local, and remote normalized playlists are saved into ``test_folder`` for inspection. The merged playlist is also stored twice to simulate the versions intended for local save and cloud upload. If compiled_rules is provided, post-processing regex rules will be applied to the results before writing. """ base_paths, local_paths, remote_paths = _normalize_inputs( base_text, local_text, remote_text, test_folder ) chunks = _merge_chunks(base_paths, local_paths, remote_paths) has_conflict = any(chunk.type == "conflict" for chunk in chunks) merged_lines: list[str] = [] conflicts: list[dict] = [] for chunk in chunks: if chunk.type == "normal": merged_lines.extend(chunk.text.splitlines()) else: conflict_info = { "base": chunk.base_text.splitlines(), "local": chunk.local_text.splitlines(), "remote": chunk.remote_text.splitlines(), } conflicts.append(conflict_info) resolved = _resolve_conflict(conflict_info, strategy) merged_lines.extend(resolved) if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY: merged_lines = _postprocess_local_priority_merge( merged_lines, base_paths, local_paths, remote_paths ) if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY: merged_lines = _postprocess_remote_priority_merge( merged_lines, base_paths, local_paths, remote_paths ) _write_results(merged_lines, test_folder, compiled_rules) return MergeResult(merged_paths=merged_lines, conflicts=conflicts) def _load_local_playlists(local_dir: str) -> dict[str, str]: """Read all local playlists under ``local_dir`` keyed by stem name.""" playlists: dict[str, str] = {} if not local_dir: return playlists absolute = os.path.abspath(local_dir) if not os.path.isdir(absolute): return playlists for entry in os.scandir(absolute): if entry.is_file() and entry.name.lower().endswith((".m3u", ".m3u8")): stem, _ = os.path.splitext(entry.name) try: with open(entry.path, "r", encoding="utf-8") as file: playlists[stem] = file.read() except OSError: playlists[stem] = "" return playlists def _load_playlist_snapshots(playlist: str, folder: str) -> tuple[str, str, str, bool, bool]: """Load base/local/remote texts for a playlist from its test folder.""" playlist_folder = os.path.join(folder, playlist) base_text, base_exists = _read_text_if_exists( os.path.join(playlist_folder, "base_next.m3u8") ) if not base_text: alt_text, _ = _read_text_if_exists( os.path.join(playlist_folder, "base_playlist.m3u8") ) base_text = base_text or alt_text remote_text, remote_exists = _read_text_if_exists( os.path.join(playlist_folder, "remote_input.m3u8") ) return base_text, remote_text, playlist_folder, remote_exists, base_exists def _fetch_remote_playlists() -> dict[str, str]: """Retrieve remote playlists from Plex when connected. Returns a mapping of playlist name to serialized playlist text. Failures to connect or fetch playlists are logged and result in an empty mapping to avoid blocking local testing. """ server_config.load() if not server_config.url: return {} try: plex_client.connect( token=server_config.token, scheme=server_config.scheme, url=server_config.url, port=server_config.port, ) except Exception as exc: # pragma: no cover - network access logger.warning(f"Failed to connect to Plex for remote playlists: {exc}") return {} playlists: dict[str, str] = {} try: for playlist in plex_client.server.playlists(): paths: list[str] = [] try: for track in playlist.items(): locations = getattr(track, "locations", None) or [] if locations: paths.append(locations[0]) except Exception as exc: # pragma: no cover - plex runtime logger.warning(f"Failed to read playlist '{playlist.title}': {exc}") playlists[playlist.title] = save_paths(paths) except Exception as exc: # pragma: no cover - plex runtime logger.warning(f"Failed to enumerate remote playlists: {exc}") return {} return playlists def _sync_single_playlist( playlist: str, mode: SyncMode, local_text: str | None, base_text: str, remote_text: str, playlist_folder: str, remote_present: bool, compiled_rules: CompiledRegexRules | None = None, ) -> PlaylistSyncResult: local_present = local_text is not None local_text = local_text or "" if not base_text: base_text = "#EXTM3U\n" if not remote_text: remote_text = "#EXTM3U\n" if mode == SyncMode.LOCAL_FORCE: if not local_present: _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder) base_paths, local_paths, _ = _normalize_inputs( base_text, local_text, remote_text, playlist_folder ) merged_lines = list(local_paths) _write_results(merged_lines, playlist_folder, compiled_rules) return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder) if mode == SyncMode.REMOTE_FORCE: if not remote_present: _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder) base_paths, _, remote_paths = _normalize_inputs( base_text, local_text, remote_text, playlist_folder ) merged_lines = list(remote_paths) _write_results(merged_lines, playlist_folder, compiled_rules) return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder) if mode not in (SyncMode.MERGE_LOCAL_PRIMARY, SyncMode.MERGE_REMOTE_PRIMARY): raise ValueError(f"Unsupported sync mode: {mode}") merge_strategy = ( ConflictResolutionStrategy.LOCAL_PRIORITY if mode == SyncMode.MERGE_LOCAL_PRIMARY else ConflictResolutionStrategy.REMOTE_PRIORITY ) merge_result = merge_playlists( base_text=base_text, local_text=local_text, remote_text=remote_text, strategy=merge_strategy, test_folder=playlist_folder, compiled_rules=compiled_rules, ) if not merge_result.merged_paths and (not local_present or not remote_present): _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult( playlist, merge_result.merged_paths, merge_result.conflicts, "deleted", playlist_folder ) return PlaylistSyncResult( playlist, merge_result.merged_paths, merge_result.conflicts, "synced", playlist_folder ) def _compile_path_mapping_rules(path_mapping: dict) -> CompiledRegexRules: """Compile regex rules from path_mapping config for all four processing stages.""" regex_config = path_mapping.get("regex", {}) return CompiledRegexRules( local_pre=_compile_regex_rules(regex_config.get("local_pre", [])), local_post=_compile_regex_rules(regex_config.get("local_post", [])), remote_pre=_compile_regex_rules(regex_config.get("remote_pre", [])), remote_post=_compile_regex_rules(regex_config.get("remote_post", [])), ) def _compile_simple_mapping_rules(simple_mappings: list[dict]) -> CompiledRegexRules: """Compile simple mapping pairs into four rule groups using UUID-based mapping_ids. Each simple mapping has: - id: UUID used as the mapping_id (unique identifier to prevent conflicts) - search: Local path prefix - replace: Cloud path prefix This generates four rule sets: - local_pre: Replace local path (search) with mapping_id - remote_pre: Replace cloud path (replace) with mapping_id - local_post: Replace mapping_id with local path (search) - remote_post: Replace mapping_id with cloud path (replace) The mapping_id is wrapped with special markers to prevent conflicts with actual paths. """ local_pre_rules: list[dict[str, str]] = [] local_post_rules: list[dict[str, str]] = [] remote_pre_rules: list[dict[str, str]] = [] remote_post_rules: list[dict[str, str]] = [] # UUID pattern for validation (accepts standard UUID format with or without hyphens) uuid_pattern = re.compile(r'^[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}$') for mapping in simple_mappings: # Get the mapping values mapping_id = mapping.get("id") local_path = mapping.get("search", "") # Local path is stored in 'search' field cloud_path = mapping.get("replace", "") # Cloud path is stored in 'replace' field # Validate mapping_id is a proper UUID to prevent injection attacks if not mapping_id or not isinstance(mapping_id, str): logger.warning(f"Skipping mapping with missing or invalid id: {mapping}") continue if not uuid_pattern.match(mapping_id): logger.warning(f"Skipping mapping with non-UUID id format: {mapping_id}") continue # Paths must be non-empty strings if not local_path or not isinstance(local_path, str): logger.warning(f"Skipping mapping with missing local path: {mapping}") continue if not cloud_path or not isinstance(cloud_path, str): logger.warning(f"Skipping mapping with missing cloud path: {mapping}") continue # Create a unique placeholder using the validated UUID # Using special markers to prevent conflicts with actual paths placeholder = f"__MAPPING__{mapping_id}__" # Pre-processing rules (use re.escape to treat paths as literal strings) # local_pre: Replace local path with placeholder local_pre_rules.append({ "pattern": re.escape(local_path), "replacement": placeholder }) # remote_pre: Replace cloud path with placeholder remote_pre_rules.append({ "pattern": re.escape(cloud_path), "replacement": placeholder }) # Post-processing rules # local_post: Replace placeholder with local path local_post_rules.append({ "pattern": re.escape(placeholder), "replacement": local_path }) # remote_post: Replace placeholder with cloud path remote_post_rules.append({ "pattern": re.escape(placeholder), "replacement": cloud_path }) logger.info(f"Compiled {len(local_pre_rules)} simple mapping pairs into rules") return CompiledRegexRules( local_pre=_compile_regex_rules(local_pre_rules), local_post=_compile_regex_rules(local_post_rules), remote_pre=_compile_regex_rules(remote_pre_rules), remote_post=_compile_regex_rules(remote_post_rules), ) def sync_all_playlists( local_dir: str, mode: SyncMode, test_folder: str = TEST_PLAYLIST_DIR ) -> list[PlaylistSyncResult]: """Synchronize all playlists that can be matched by name. Path mapping modes: - SIMPLE: Uses UUID-based mapping_ids to convert between local and cloud paths - local_pre: local_path -> mapping_id - remote_pre: cloud_path -> mapping_id - local_post: mapping_id -> local_path - remote_post: mapping_id -> cloud_path - REGEX: Uses custom regex rules for each processing stage - local_pre, local_post, remote_pre, remote_post rules are applied directly Processing flow: 1. local_pre rules are applied to local playlists before sync 2. remote_pre rules are applied to remote playlists before sync 3. Sync/merge is performed 4. local_post rules are applied to results before writing to local_result.m3u8 5. remote_post rules are applied to results before writing to remote_result.m3u8 """ server_config.load() # Get path_mapping configuration path_mapping = server_config.path_mapping mapping_mode = path_mapping.get("mode", "SIMPLE") # Compile rules based on the mode compiled_rules: CompiledRegexRules | None = None legacy_compiled_rules: list[tuple[re.Pattern[str], str]] = [] if mapping_mode == "REGEX": compiled_rules = _compile_path_mapping_rules(path_mapping) logger.info("Using REGEX mode for path mapping with 4 rule groups") elif mapping_mode == "SIMPLE": simple_mappings = path_mapping.get("simple", []) if simple_mappings: compiled_rules = _compile_simple_mapping_rules(simple_mappings) logger.info(f"Using SIMPLE mode for path mapping with {len(simple_mappings)} mapping pairs") else: logger.info("SIMPLE mode with no mappings - no path transformations will be applied") else: # Use legacy path_rules for backward compatibility legacy_compiled_rules = _compile_regex_rules(server_config.path_rules) logger.info("Using legacy path_rules for preprocessing") _ensure_test_dir(test_folder) logger.info(f"Syncing playlists to test folder: {test_folder}") local_playlists = _load_local_playlists(local_dir) remote_playlists = _fetch_remote_playlists() playlist_names: set[str] = set(local_playlists.keys()) playlist_names.update(remote_playlists.keys()) for entry in os.scandir(test_folder): if entry.is_dir(): playlist_names.add(entry.name) results: list[PlaylistSyncResult] = [] for playlist in sorted(playlist_names): base_text, snapshot_remote_text, playlist_folder, remote_exists, _ = _load_playlist_snapshots( playlist, test_folder ) local_text = local_playlists.get(playlist) remote_text = remote_playlists.get(playlist) remote_present = False if remote_text: remote_present = True else: remote_text = snapshot_remote_text remote_present = bool(remote_text.strip()) or remote_exists if compiled_rules: # Apply pre-processing rules for REGEX or SIMPLE mode # base_text doesn't need pre-processing as it's the normalized state if local_text is not None and compiled_rules.local_pre: local_text = preprocess_playlist_text( local_text, [], compiled_rules.local_pre ) if remote_text and compiled_rules.remote_pre: remote_text = preprocess_playlist_text( remote_text, [], compiled_rules.remote_pre ) elif legacy_compiled_rules: # Use legacy preprocessing for all texts base_text = preprocess_playlist_text( base_text, server_config.path_rules, legacy_compiled_rules ) remote_text = preprocess_playlist_text( remote_text, server_config.path_rules, legacy_compiled_rules ) if local_text is not None: local_text = preprocess_playlist_text( local_text, server_config.path_rules, legacy_compiled_rules ) # Treat missing remote text as absent playlist. result = _sync_single_playlist( playlist=playlist, mode=mode, local_text=local_text, base_text=base_text, remote_text=remote_text, playlist_folder=playlist_folder, remote_present=remote_present, compiled_rules=compiled_rules, ) results.append(result) return results