import os import re from collections import Counter from dataclasses import dataclass from enum import Enum from typing import Literal, Sequence from app.utils.config import server_config from app.utils.logger import logger from app.utils.plex_client import plex_client from merge3 import Merge3 TEST_PLAYLIST_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "test_playlists") ) class ConflictResolutionStrategy(str, Enum): LOCAL_PRIORITY = "local_priority" REMOTE_PRIORITY = "remote_priority" DELETE_PRIORITY = "delete_priority" KEEP_PRIORITY = "keep_priority" class SyncMode(str, Enum): LOCAL_FORCE = "local_force" REMOTE_FORCE = "remote_force" MERGE_LOCAL_PRIMARY = "merge_local_primary" MERGE_REMOTE_PRIMARY = "merge_remote_primary" @dataclass class PlaylistSyncResult: name: str merged_paths: list[str] conflicts: list[dict] action: Literal["synced", "deleted"] output_dir: str def load_paths(text: str) -> list[str]: """Normalize playlist text into a list of absolute paths. * Remove blank lines. * Ignore comment or metadata lines starting with ``#``. * Preserve order and allow duplicates. """ output: list[str] = [] for line in text.splitlines(): line = line.strip() if not line: continue if line.startswith("#"): continue output.append(line) return output def playlist_to_text(paths: Sequence[str]) -> str: """Convert list of paths into diff3-friendly text.""" return "\n".join(paths) + "\n" def save_paths(paths: Sequence[str]) -> str: """Format playlist paths into m3u/m3u8 text.""" return "#EXTM3U\n" + "\n".join(paths) + "\n" def _compile_regex_rules(rules: Sequence[dict[str, str]]) -> list[tuple[re.Pattern[str], str]]: compiled: list[tuple[re.Pattern[str], str]] = [] for rule in rules: pattern = rule.get("pattern") if not pattern: continue replacement = rule.get("replacement", "") try: compiled.append((re.compile(pattern), replacement)) except re.error as exc: logger.warning(f"Skipping invalid regex '{pattern}': {exc}") return compiled def _apply_compiled_rules_to_paths( paths: Sequence[str], compiled_rules: Sequence[tuple[re.Pattern[str], str]] ) -> list[str]: if not compiled_rules: return list(paths) updated: list[str] = [] for path in paths: new_path = path for pattern, replacement in compiled_rules: new_path = pattern.sub(replacement, new_path) updated.append(new_path) return updated def apply_regex_rules_to_paths( paths: Sequence[str], rules: Sequence[dict[str, str]] ) -> list[str]: """Apply regex replacement rules to each path in order.""" compiled_rules = _compile_regex_rules(rules) return _apply_compiled_rules_to_paths(paths, compiled_rules) def preprocess_playlist_text( text: str, rules: Sequence[dict[str, str]], compiled_rules: Sequence[tuple[re.Pattern[str], str]] | None = None, ) -> str: """Normalize playlist text and apply regex replacements.""" if not text: text = "#EXTM3U\n" paths = load_paths(text) compiled = compiled_rules if compiled_rules is not None else _compile_regex_rules(rules) replaced_paths = _apply_compiled_rules_to_paths(paths, compiled) return save_paths(replaced_paths) @dataclass class MergeChunk: type: Literal["normal", "conflict"] text: str | None = None base_text: str | None = None local_text: str | None = None remote_text: str | None = None origin: str | None = None @dataclass class MergeResult: merged_paths: list[str] conflicts: list[dict] def _ensure_test_dir(folder: str = TEST_PLAYLIST_DIR) -> str: os.makedirs(folder, exist_ok=True) return folder def _read_text_if_exists(path: str) -> tuple[str, bool]: try: with open(path, "r", encoding="utf-8") as file: return file.read(), True except FileNotFoundError: return "", False except OSError: return "", False def _save_playlist_to_folder(filename: str, paths: Sequence[str], folder: str) -> str: _ensure_test_dir(folder) file_path = os.path.join(folder, filename) with open(file_path, "w", encoding="utf-8") as file: file.write(save_paths(paths)) return file_path def _normalize_inputs( base_text: str, local_text: str, remote_text: str, folder: str ) -> tuple[list[str], list[str], list[str]]: """Normalize playlist inputs and persist snapshots for inspection.""" base_paths = load_paths(base_text) local_paths = load_paths(local_text) remote_paths = load_paths(remote_text) _save_playlist_to_folder("base_playlist.m3u8", base_paths, folder) _save_playlist_to_folder("local_input.m3u8", local_paths, folder) _save_playlist_to_folder("remote_input.m3u8", remote_paths, folder) return base_paths, local_paths, remote_paths def _merge_chunks( base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str] ) -> list[MergeChunk]: merger = Merge3(base_paths, local_paths, remote_paths) raw_chunks = list(merger.merge_groups()) chunks: list[MergeChunk] = [] for kind, *payload in raw_chunks: if kind == "conflict": base_lines, local_lines, remote_lines = payload # type: ignore[misc] # Absorb trailing side-only chunks that actually belong to this # conflicting region (common in reorder scenarios). while chunks and chunks[-1].origin in ("a", "b"): previous = chunks.pop() previous_lines = previous.text.splitlines() if previous.text else [] if previous.origin == "a": local_lines = previous_lines + local_lines else: remote_lines = previous_lines + remote_lines chunks.append( MergeChunk( type="conflict", base_text=playlist_to_text(base_lines), local_text=playlist_to_text(local_lines), remote_text=playlist_to_text(remote_lines), ) ) else: lines = payload[0] if not lines: continue chunk = MergeChunk(type="normal", text=playlist_to_text(lines), origin=kind) chunks.append(chunk) return chunks def _write_results(merged_lines: Sequence[str], folder: str) -> None: _save_playlist_to_folder("local_result.m3u8", merged_lines, folder) _save_playlist_to_folder("remote_result.m3u8", merged_lines, folder) _save_playlist_to_folder("base_next.m3u8", merged_lines, folder) def _write_delete_marker(playlist: str, folder: str) -> str: _ensure_test_dir(folder) marker_path = os.path.join(folder, "delete.txt") with open(marker_path, "w", encoding="utf-8") as file: file.write(f"delete playlist {playlist}") return marker_path def _resolve_conflict(conflict: dict, strategy: ConflictResolutionStrategy) -> list[str]: local_lines = conflict["local"] remote_lines = conflict["remote"] if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY: return local_lines if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY: return remote_lines if strategy == ConflictResolutionStrategy.DELETE_PRIORITY: if not local_lines or not remote_lines: return [] return local_lines if strategy == ConflictResolutionStrategy.KEEP_PRIORITY: if local_lines: return local_lines if remote_lines: return remote_lines return [] raise ValueError(f"Unsupported conflict resolution strategy: {strategy}") def _postprocess_local_priority_merge( merged_lines: list[str], base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str], ) -> list[str]: """Fix local-priority edge cases without disturbing existing order.""" base_counter = Counter(base_paths) local_counter = Counter(local_paths) remote_counter = Counter(remote_paths) merged_counter = Counter(merged_lines) merged = list(merged_lines) # 1) Preserve tracks that only Remote added (not in Base or Local). for track, remote_count in remote_counter.items(): if base_counter[track] == 0 and local_counter[track] == 0 and remote_count > 0: current = merged_counter.get(track, 0) while current < remote_count: merged.append(track) merged_counter[track] += 1 current += 1 # 2) For tracks only new in Local+Remote (not in Base), keep Local's count. candidate_tracks = set(local_counter.keys()) | set(remote_counter.keys()) for track in candidate_tracks: if base_counter[track] == 0 and local_counter[track] > 0 and remote_counter[track] > 0: target = local_counter[track] current = merged_counter.get(track, 0) if current > target: remove_needed = current - target trimmed: list[str] = [] for path in reversed(merged): if remove_needed and path == track: remove_needed -= 1 continue trimmed.append(path) trimmed.reverse() merged = trimmed merged_counter = Counter(merged) current = merged_counter.get(track, 0) while current < target: merged.append(track) current += 1 merged_counter[track] = current return merged def _postprocess_remote_priority_merge( merged_lines: list[str], base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str], ) -> list[str]: base_counter = Counter(base_paths) local_counter = Counter(local_paths) remote_counter = Counter(remote_paths) remote_base_tracks: list[str] = [ track for track in remote_paths if base_counter[track] > 0 ] target_new_counter: dict[str, int] = {} all_tracks = set(local_counter.keys()) | set(remote_counter.keys()) for track in all_tracks: if base_counter[track] > 0: continue r_cnt = remote_counter.get(track, 0) l_cnt = local_counter.get(track, 0) if r_cnt > 0: target_new_counter[track] = r_cnt elif l_cnt > 0: target_new_counter[track] = l_cnt new_tracks: list[str] = [] used_counter: Counter[str] = Counter() for track in remote_paths: if base_counter[track] > 0: continue if track not in target_new_counter: continue if used_counter[track] >= target_new_counter[track]: continue new_tracks.append(track) used_counter[track] += 1 for track in local_paths: if base_counter[track] > 0: continue if track not in target_new_counter: continue if used_counter[track] >= target_new_counter[track]: continue new_tracks.append(track) used_counter[track] += 1 return remote_base_tracks + new_tracks def merge_playlists( base_text: str, local_text: str, remote_text: str, strategy: ConflictResolutionStrategy = ConflictResolutionStrategy.LOCAL_PRIORITY, test_folder: str = TEST_PLAYLIST_DIR, ) -> MergeResult: """Merge playlists using diff3 and resolve conflicts per strategy. The base, local, and remote normalized playlists are saved into ``test_folder`` for inspection. The merged playlist is also stored twice to simulate the versions intended for local save and cloud upload. """ base_paths, local_paths, remote_paths = _normalize_inputs( base_text, local_text, remote_text, test_folder ) chunks = _merge_chunks(base_paths, local_paths, remote_paths) has_conflict = any(chunk.type == "conflict" for chunk in chunks) merged_lines: list[str] = [] conflicts: list[dict] = [] for chunk in chunks: if chunk.type == "normal": merged_lines.extend(chunk.text.splitlines()) else: conflict_info = { "base": chunk.base_text.splitlines(), "local": chunk.local_text.splitlines(), "remote": chunk.remote_text.splitlines(), } conflicts.append(conflict_info) resolved = _resolve_conflict(conflict_info, strategy) merged_lines.extend(resolved) if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY: merged_lines = _postprocess_local_priority_merge( merged_lines, base_paths, local_paths, remote_paths ) if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY: merged_lines = _postprocess_remote_priority_merge( merged_lines, base_paths, local_paths, remote_paths ) _write_results(merged_lines, test_folder) return MergeResult(merged_paths=merged_lines, conflicts=conflicts) def _load_local_playlists(local_dir: str) -> dict[str, str]: """Read all local playlists under ``local_dir`` keyed by stem name.""" playlists: dict[str, str] = {} if not local_dir: return playlists absolute = os.path.abspath(local_dir) if not os.path.isdir(absolute): return playlists for entry in os.scandir(absolute): if entry.is_file() and entry.name.lower().endswith((".m3u", ".m3u8")): stem, _ = os.path.splitext(entry.name) try: with open(entry.path, "r", encoding="utf-8") as file: playlists[stem] = file.read() except OSError: playlists[stem] = "" return playlists def _load_playlist_snapshots(playlist: str, folder: str) -> tuple[str, str, str, bool, bool]: """Load base/local/remote texts for a playlist from its test folder.""" playlist_folder = os.path.join(folder, playlist) base_text, base_exists = _read_text_if_exists( os.path.join(playlist_folder, "base_next.m3u8") ) if not base_text: alt_text, _ = _read_text_if_exists( os.path.join(playlist_folder, "base_playlist.m3u8") ) base_text = base_text or alt_text remote_text, remote_exists = _read_text_if_exists( os.path.join(playlist_folder, "remote_input.m3u8") ) return base_text, remote_text, playlist_folder, remote_exists, base_exists def _fetch_remote_playlists() -> dict[str, str]: """Retrieve remote playlists from Plex when connected. Returns a mapping of playlist name to serialized playlist text. Failures to connect or fetch playlists are logged and result in an empty mapping to avoid blocking local testing. """ server_config.load() if not server_config.url: return {} try: plex_client.connect( token=server_config.token, scheme=server_config.scheme, url=server_config.url, port=server_config.port, ) except Exception as exc: # pragma: no cover - network access logger.warning(f"Failed to connect to Plex for remote playlists: {exc}") return {} playlists: dict[str, str] = {} try: for playlist in plex_client.server.playlists(): paths: list[str] = [] try: for track in playlist.items(): locations = getattr(track, "locations", None) or [] if locations: paths.append(locations[0]) except Exception as exc: # pragma: no cover - plex runtime logger.warning(f"Failed to read playlist '{playlist.title}': {exc}") playlists[playlist.title] = save_paths(paths) except Exception as exc: # pragma: no cover - plex runtime logger.warning(f"Failed to enumerate remote playlists: {exc}") return {} return playlists def _sync_single_playlist( playlist: str, mode: SyncMode, local_text: str | None, base_text: str, remote_text: str, playlist_folder: str, remote_present: bool, ) -> PlaylistSyncResult: local_present = local_text is not None local_text = local_text or "" if not base_text: base_text = "#EXTM3U\n" if not remote_text: remote_text = "#EXTM3U\n" if mode == SyncMode.LOCAL_FORCE: if not local_present: _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder) base_paths, local_paths, _ = _normalize_inputs( base_text, local_text, remote_text, playlist_folder ) merged_lines = list(local_paths) _write_results(merged_lines, playlist_folder) return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder) if mode == SyncMode.REMOTE_FORCE: if not remote_present: _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder) base_paths, _, remote_paths = _normalize_inputs( base_text, local_text, remote_text, playlist_folder ) merged_lines = list(remote_paths) _write_results(merged_lines, playlist_folder) return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder) if mode not in (SyncMode.MERGE_LOCAL_PRIMARY, SyncMode.MERGE_REMOTE_PRIMARY): raise ValueError(f"Unsupported sync mode: {mode}") merge_strategy = ( ConflictResolutionStrategy.LOCAL_PRIORITY if mode == SyncMode.MERGE_LOCAL_PRIMARY else ConflictResolutionStrategy.REMOTE_PRIORITY ) merge_result = merge_playlists( base_text=base_text, local_text=local_text, remote_text=remote_text, strategy=merge_strategy, test_folder=playlist_folder, ) if not merge_result.merged_paths and (not local_present or not remote_present): _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult( playlist, merge_result.merged_paths, merge_result.conflicts, "deleted", playlist_folder ) return PlaylistSyncResult( playlist, merge_result.merged_paths, merge_result.conflicts, "synced", playlist_folder ) def sync_all_playlists( local_dir: str, mode: SyncMode, test_folder: str = TEST_PLAYLIST_DIR ) -> list[PlaylistSyncResult]: """Synchronize all playlists that can be matched by name.""" server_config.load() compiled_rules = _compile_regex_rules(server_config.path_rules) _ensure_test_dir(test_folder) local_playlists = _load_local_playlists(local_dir) remote_playlists = _fetch_remote_playlists() playlist_names: set[str] = set(local_playlists.keys()) playlist_names.update(remote_playlists.keys()) for entry in os.scandir(test_folder): if entry.is_dir(): playlist_names.add(entry.name) results: list[PlaylistSyncResult] = [] for playlist in sorted(playlist_names): base_text, snapshot_remote_text, playlist_folder, remote_exists, _ = _load_playlist_snapshots( playlist, test_folder ) local_text = local_playlists.get(playlist) remote_text = remote_playlists.get(playlist) remote_present = False if remote_text: remote_present = True else: remote_text = snapshot_remote_text remote_present = bool(remote_text.strip()) or remote_exists base_text = preprocess_playlist_text( base_text, server_config.path_rules, compiled_rules ) remote_text = preprocess_playlist_text( remote_text, server_config.path_rules, compiled_rules ) if local_text is not None: local_text = preprocess_playlist_text( local_text, server_config.path_rules, compiled_rules ) # Treat missing remote text as absent playlist. result = _sync_single_playlist( playlist=playlist, mode=mode, local_text=local_text, base_text=base_text, remote_text=remote_text, playlist_folder=playlist_folder, remote_present=remote_present, ) results.append(result) return results