import os from dataclasses import dataclass from enum import Enum from typing import Literal, Sequence from app.utils.config import server_config from app.utils.logger import logger from app.utils.plex_client import plex_client from merge3 import Merge3 TEST_PLAYLIST_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "test_playlists") ) class ConflictResolutionStrategy(str, Enum): LOCAL_PRIORITY = "local_priority" REMOTE_PRIORITY = "remote_priority" DELETE_PRIORITY = "delete_priority" KEEP_PRIORITY = "keep_priority" class SyncMode(str, Enum): LOCAL_FORCE = "local_force" REMOTE_FORCE = "remote_force" MERGE_LOCAL_PRIMARY = "merge_local_primary" MERGE_REMOTE_PRIMARY = "merge_remote_primary" @dataclass class PlaylistSyncResult: name: str merged_paths: list[str] conflicts: list[dict] action: Literal["synced", "deleted"] output_dir: str def load_paths(text: str) -> list[str]: """Normalize playlist text into a list of absolute paths. * Remove blank lines. * Ignore comment or metadata lines starting with ``#``. * Preserve order and allow duplicates. """ output: list[str] = [] for line in text.splitlines(): line = line.strip() if not line: continue if line.startswith("#"): continue output.append(line) return output def playlist_to_text(paths: Sequence[str]) -> str: """Convert list of paths into diff3-friendly text.""" return "\n".join(paths) + "\n" def save_paths(paths: Sequence[str]) -> str: """Format playlist paths into m3u/m3u8 text.""" return "#EXTM3U\n" + "\n".join(paths) + "\n" @dataclass class MergeChunk: type: Literal["normal", "conflict"] text: str | None = None base_text: str | None = None local_text: str | None = None remote_text: str | None = None @dataclass class MergeResult: merged_paths: list[str] conflicts: list[dict] def _ensure_test_dir(folder: str = TEST_PLAYLIST_DIR) -> str: os.makedirs(folder, exist_ok=True) return folder def _read_text_if_exists(path: str) -> tuple[str, bool]: try: with open(path, "r", encoding="utf-8") as file: return file.read(), True except FileNotFoundError: return "", False except OSError: return "", False def _save_playlist_to_folder(filename: str, paths: Sequence[str], folder: str) -> str: _ensure_test_dir(folder) file_path = os.path.join(folder, filename) with open(file_path, "w", encoding="utf-8") as file: file.write(save_paths(paths)) return file_path def _normalize_inputs( base_text: str, local_text: str, remote_text: str, folder: str ) -> tuple[list[str], list[str], list[str]]: """Normalize playlist inputs and persist snapshots for inspection.""" base_paths = load_paths(base_text) local_paths = load_paths(local_text) remote_paths = load_paths(remote_text) _save_playlist_to_folder("base_playlist.m3u8", base_paths, folder) _save_playlist_to_folder("local_input.m3u8", local_paths, folder) _save_playlist_to_folder("remote_input.m3u8", remote_paths, folder) return base_paths, local_paths, remote_paths def _merge_chunks( base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str] ) -> list[MergeChunk]: """Run a diff3 merge on normalized playlists and return merge chunks.""" merger = Merge3(base_paths, local_paths, remote_paths) chunks: list[MergeChunk] = [] for group in merger.merge_groups(): kind = group[0] if kind == "conflict": base_lines, local_lines, remote_lines = group[1], group[2], group[3] chunks.append( MergeChunk( type="conflict", base_text=playlist_to_text(base_lines), local_text=playlist_to_text(local_lines), remote_text=playlist_to_text(remote_lines), ) ) else: if kind in ("unchanged", "same", "a", "b"): lines = group[1] chunks.append( MergeChunk( type="normal", text=playlist_to_text(lines), ) ) else: raise ValueError(f"Unsupported merge chunk type: {kind}") return chunks def _write_results(merged_lines: Sequence[str], folder: str) -> None: _save_playlist_to_folder("local_result.m3u8", merged_lines, folder) _save_playlist_to_folder("remote_result.m3u8", merged_lines, folder) _save_playlist_to_folder("base_next.m3u8", merged_lines, folder) def _write_delete_marker(playlist: str, folder: str) -> str: _ensure_test_dir(folder) marker_path = os.path.join(folder, "delete.txt") with open(marker_path, "w", encoding="utf-8") as file: file.write(f"delete playlist {playlist}") return marker_path def _resolve_conflict(conflict: dict, strategy: ConflictResolutionStrategy) -> list[str]: local_lines = conflict["local"] remote_lines = conflict["remote"] if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY: return local_lines if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY: return remote_lines if strategy == ConflictResolutionStrategy.DELETE_PRIORITY: if not local_lines or not remote_lines: return [] return local_lines if strategy == ConflictResolutionStrategy.KEEP_PRIORITY: if local_lines: return local_lines if remote_lines: return remote_lines return [] raise ValueError(f"Unsupported conflict resolution strategy: {strategy}") def merge_playlists( base_text: str, local_text: str, remote_text: str, strategy: ConflictResolutionStrategy = ConflictResolutionStrategy.LOCAL_PRIORITY, test_folder: str = TEST_PLAYLIST_DIR, ) -> MergeResult: """Merge playlists using diff3 and resolve conflicts per strategy. The base, local, and remote normalized playlists are saved into ``test_folder`` for inspection. The merged playlist is also stored twice to simulate the versions intended for local save and cloud upload. """ base_paths, local_paths, remote_paths = _normalize_inputs( base_text, local_text, remote_text, test_folder ) chunks = _merge_chunks(base_paths, local_paths, remote_paths) merged_lines: list[str] = [] conflicts: list[dict] = [] for chunk in chunks: if chunk.type == "normal": merged_lines.extend(chunk.text.splitlines()) else: conflict_info = { "base": chunk.base_text.splitlines(), "local": chunk.local_text.splitlines(), "remote": chunk.remote_text.splitlines(), } conflicts.append(conflict_info) resolved = _resolve_conflict(conflict_info, strategy) merged_lines.extend(resolved) _write_results(merged_lines, test_folder) return MergeResult(merged_paths=merged_lines, conflicts=conflicts) def _load_local_playlists(local_dir: str) -> dict[str, str]: """Read all local playlists under ``local_dir`` keyed by stem name.""" playlists: dict[str, str] = {} if not local_dir: return playlists absolute = os.path.abspath(local_dir) if not os.path.isdir(absolute): return playlists for entry in os.scandir(absolute): if entry.is_file() and entry.name.lower().endswith((".m3u", ".m3u8")): stem, _ = os.path.splitext(entry.name) try: with open(entry.path, "r", encoding="utf-8") as file: playlists[stem] = file.read() except OSError: playlists[stem] = "" return playlists def _load_playlist_snapshots(playlist: str, folder: str) -> tuple[str, str, str, bool, bool]: """Load base/local/remote texts for a playlist from its test folder.""" playlist_folder = os.path.join(folder, playlist) base_text, base_exists = _read_text_if_exists( os.path.join(playlist_folder, "base_next.m3u8") ) if not base_text: alt_text, _ = _read_text_if_exists( os.path.join(playlist_folder, "base_playlist.m3u8") ) base_text = base_text or alt_text remote_text, remote_exists = _read_text_if_exists( os.path.join(playlist_folder, "remote_input.m3u8") ) return base_text, remote_text, playlist_folder, remote_exists, base_exists def _fetch_remote_playlists() -> dict[str, str]: """Retrieve remote playlists from Plex when connected. Returns a mapping of playlist name to serialized playlist text. Failures to connect or fetch playlists are logged and result in an empty mapping to avoid blocking local testing. """ server_config.load() if not server_config.url: return {} try: plex_client.connect( token=server_config.token, scheme=server_config.scheme, url=server_config.url, port=server_config.port, ) except Exception as exc: # pragma: no cover - network access logger.warning(f"Failed to connect to Plex for remote playlists: {exc}") return {} playlists: dict[str, str] = {} try: for playlist in plex_client.server.playlists(): paths: list[str] = [] try: for track in playlist.items(): locations = getattr(track, "locations", None) or [] if locations: paths.append(locations[0]) except Exception as exc: # pragma: no cover - plex runtime logger.warning(f"Failed to read playlist '{playlist.title}': {exc}") playlists[playlist.title] = save_paths(paths) except Exception as exc: # pragma: no cover - plex runtime logger.warning(f"Failed to enumerate remote playlists: {exc}") return {} return playlists def _sync_single_playlist( playlist: str, mode: SyncMode, local_text: str | None, base_text: str, remote_text: str, playlist_folder: str, remote_present: bool, ) -> PlaylistSyncResult: local_present = local_text is not None local_text = local_text or "" if not base_text: base_text = "#EXTM3U\n" if not remote_text: remote_text = "#EXTM3U\n" if mode == SyncMode.LOCAL_FORCE: if not local_present: _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder) base_paths, local_paths, _ = _normalize_inputs( base_text, local_text, remote_text, playlist_folder ) merged_lines = list(local_paths) _write_results(merged_lines, playlist_folder) return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder) if mode == SyncMode.REMOTE_FORCE: if not remote_present: _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder) base_paths, _, remote_paths = _normalize_inputs( base_text, local_text, remote_text, playlist_folder ) merged_lines = list(remote_paths) _write_results(merged_lines, playlist_folder) return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder) if mode not in (SyncMode.MERGE_LOCAL_PRIMARY, SyncMode.MERGE_REMOTE_PRIMARY): raise ValueError(f"Unsupported sync mode: {mode}") merge_strategy = ( ConflictResolutionStrategy.LOCAL_PRIORITY if mode == SyncMode.MERGE_LOCAL_PRIMARY else ConflictResolutionStrategy.REMOTE_PRIORITY ) merge_result = merge_playlists( base_text=base_text, local_text=local_text, remote_text=remote_text, strategy=merge_strategy, test_folder=playlist_folder, ) if not merge_result.merged_paths and (not local_present or not remote_present): _write_delete_marker(playlist, playlist_folder) return PlaylistSyncResult( playlist, merge_result.merged_paths, merge_result.conflicts, "deleted", playlist_folder ) return PlaylistSyncResult( playlist, merge_result.merged_paths, merge_result.conflicts, "synced", playlist_folder ) def sync_all_playlists( local_dir: str, mode: SyncMode, test_folder: str = TEST_PLAYLIST_DIR ) -> list[PlaylistSyncResult]: """Synchronize all playlists that can be matched by name.""" _ensure_test_dir(test_folder) local_playlists = _load_local_playlists(local_dir) remote_playlists = _fetch_remote_playlists() playlist_names: set[str] = set(local_playlists.keys()) playlist_names.update(remote_playlists.keys()) for entry in os.scandir(test_folder): if entry.is_dir(): playlist_names.add(entry.name) results: list[PlaylistSyncResult] = [] for playlist in sorted(playlist_names): base_text, snapshot_remote_text, playlist_folder, remote_exists, _ = _load_playlist_snapshots( playlist, test_folder ) local_text = local_playlists.get(playlist) remote_text = remote_playlists.get(playlist) remote_present = False if remote_text: remote_present = True else: remote_text = snapshot_remote_text remote_present = bool(remote_text.strip()) or remote_exists # Treat missing remote text as absent playlist. result = _sync_single_playlist( playlist=playlist, mode=mode, local_text=local_text, base_text=base_text, remote_text=remote_text, playlist_folder=playlist_folder, remote_present=remote_present, ) results.append(result) return results