Files
PlexPlaylistSync/app/utils/playlist_merge.py
T

638 lines
20 KiB
Python

import os
import re
from collections import Counter
from dataclasses import dataclass
from enum import Enum
from typing import Literal, Sequence
from app.utils.config import server_config
from app.utils.logger import logger
from app.utils.plex_client import plex_client
from merge3 import Merge3
TEST_PLAYLIST_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "test_playlists")
)
class ConflictResolutionStrategy(str, Enum):
LOCAL_PRIORITY = "local_priority"
REMOTE_PRIORITY = "remote_priority"
DELETE_PRIORITY = "delete_priority"
KEEP_PRIORITY = "keep_priority"
class SyncMode(str, Enum):
LOCAL_FORCE = "local_force"
REMOTE_FORCE = "remote_force"
MERGE_LOCAL_PRIMARY = "merge_local_primary"
MERGE_REMOTE_PRIMARY = "merge_remote_primary"
@dataclass
class PlaylistSyncResult:
name: str
merged_paths: list[str]
conflicts: list[dict]
action: Literal["synced", "deleted"]
output_dir: str
def load_paths(text: str) -> list[str]:
"""Normalize playlist text into a list of absolute paths.
* Remove blank lines.
* Ignore comment or metadata lines starting with ``#``.
* Preserve order and allow duplicates.
"""
output: list[str] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
output.append(line)
return output
def playlist_to_text(paths: Sequence[str]) -> str:
"""Convert list of paths into diff3-friendly text."""
return "\n".join(paths) + "\n"
def save_paths(paths: Sequence[str]) -> str:
"""Format playlist paths into m3u/m3u8 text."""
return "#EXTM3U\n" + "\n".join(paths) + "\n"
def _compile_regex_rules(rules: Sequence[dict[str, str]]) -> list[tuple[re.Pattern[str], str]]:
compiled: list[tuple[re.Pattern[str], str]] = []
for rule in rules:
pattern = rule.get("pattern")
if not pattern:
continue
replacement = rule.get("replacement", "")
try:
compiled.append((re.compile(pattern), replacement))
except re.error as exc:
logger.warning(f"Skipping invalid regex '{pattern}': {exc}")
return compiled
def _apply_compiled_rules_to_paths(
paths: Sequence[str], compiled_rules: Sequence[tuple[re.Pattern[str], str]]
) -> list[str]:
if not compiled_rules:
return list(paths)
updated: list[str] = []
for path in paths:
new_path = path
for pattern, replacement in compiled_rules:
new_path = pattern.sub(replacement, new_path)
updated.append(new_path)
return updated
def apply_regex_rules_to_paths(
paths: Sequence[str], rules: Sequence[dict[str, str]]
) -> list[str]:
"""Apply regex replacement rules to each path in order."""
compiled_rules = _compile_regex_rules(rules)
return _apply_compiled_rules_to_paths(paths, compiled_rules)
def preprocess_playlist_text(
text: str,
rules: Sequence[dict[str, str]],
compiled_rules: Sequence[tuple[re.Pattern[str], str]] | None = None,
) -> str:
"""Normalize playlist text and apply regex replacements."""
if not text:
text = "#EXTM3U\n"
paths = load_paths(text)
compiled = compiled_rules if compiled_rules is not None else _compile_regex_rules(rules)
replaced_paths = _apply_compiled_rules_to_paths(paths, compiled)
return save_paths(replaced_paths)
@dataclass
class MergeChunk:
type: Literal["normal", "conflict"]
text: str | None = None
base_text: str | None = None
local_text: str | None = None
remote_text: str | None = None
origin: str | None = None
@dataclass
class MergeResult:
merged_paths: list[str]
conflicts: list[dict]
def _ensure_test_dir(folder: str = TEST_PLAYLIST_DIR) -> str:
os.makedirs(folder, exist_ok=True)
return folder
def _read_text_if_exists(path: str) -> tuple[str, bool]:
try:
with open(path, "r", encoding="utf-8") as file:
return file.read(), True
except FileNotFoundError:
return "", False
except OSError:
return "", False
def _save_playlist_to_folder(filename: str, paths: Sequence[str], folder: str) -> str:
_ensure_test_dir(folder)
file_path = os.path.join(folder, filename)
new_content = save_paths(paths)
# Check if content has changed before writing to avoid triggering unnecessary file events
if os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
current_content = f.read()
if current_content == new_content:
return file_path
except OSError:
pass
with open(file_path, "w", encoding="utf-8") as file:
file.write(new_content)
return file_path
def _normalize_inputs(
base_text: str, local_text: str, remote_text: str, folder: str
) -> tuple[list[str], list[str], list[str]]:
"""Normalize playlist inputs and persist snapshots for inspection."""
base_paths = load_paths(base_text)
local_paths = load_paths(local_text)
remote_paths = load_paths(remote_text)
_save_playlist_to_folder("base_playlist.m3u8", base_paths, folder)
_save_playlist_to_folder("local_input.m3u8", local_paths, folder)
_save_playlist_to_folder("remote_input.m3u8", remote_paths, folder)
return base_paths, local_paths, remote_paths
def _merge_chunks(
base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str]
) -> list[MergeChunk]:
merger = Merge3(base_paths, local_paths, remote_paths)
raw_chunks = list(merger.merge_groups())
chunks: list[MergeChunk] = []
for kind, *payload in raw_chunks:
if kind == "conflict":
base_lines, local_lines, remote_lines = payload # type: ignore[misc]
# Absorb trailing side-only chunks that actually belong to this
# conflicting region (common in reorder scenarios).
while chunks and chunks[-1].origin in ("a", "b"):
previous = chunks.pop()
previous_lines = previous.text.splitlines() if previous.text else []
if previous.origin == "a":
local_lines = previous_lines + local_lines
else:
remote_lines = previous_lines + remote_lines
chunks.append(
MergeChunk(
type="conflict",
base_text=playlist_to_text(base_lines),
local_text=playlist_to_text(local_lines),
remote_text=playlist_to_text(remote_lines),
)
)
else:
lines = payload[0]
if not lines:
continue
chunk = MergeChunk(type="normal", text=playlist_to_text(lines), origin=kind)
chunks.append(chunk)
return chunks
def _write_results(merged_lines: Sequence[str], folder: str) -> None:
_save_playlist_to_folder("local_result.m3u8", merged_lines, folder)
_save_playlist_to_folder("remote_result.m3u8", merged_lines, folder)
_save_playlist_to_folder("base_next.m3u8", merged_lines, folder)
def _write_delete_marker(playlist: str, folder: str) -> str:
_ensure_test_dir(folder)
marker_path = os.path.join(folder, "delete.txt")
with open(marker_path, "w", encoding="utf-8") as file:
file.write(f"delete playlist {playlist}")
return marker_path
def _resolve_conflict(conflict: dict, strategy: ConflictResolutionStrategy) -> list[str]:
local_lines = conflict["local"]
remote_lines = conflict["remote"]
if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY:
return local_lines
if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY:
return remote_lines
if strategy == ConflictResolutionStrategy.DELETE_PRIORITY:
if not local_lines or not remote_lines:
return []
return local_lines
if strategy == ConflictResolutionStrategy.KEEP_PRIORITY:
if local_lines:
return local_lines
if remote_lines:
return remote_lines
return []
raise ValueError(f"Unsupported conflict resolution strategy: {strategy}")
def _postprocess_local_priority_merge(
merged_lines: list[str],
base_paths: Sequence[str],
local_paths: Sequence[str],
remote_paths: Sequence[str],
) -> list[str]:
"""Fix local-priority edge cases without disturbing existing order."""
base_counter = Counter(base_paths)
local_counter = Counter(local_paths)
remote_counter = Counter(remote_paths)
merged_counter = Counter(merged_lines)
merged = list(merged_lines)
# 1) Preserve tracks that only Remote added (not in Base or Local).
for track, remote_count in remote_counter.items():
if base_counter[track] == 0 and local_counter[track] == 0 and remote_count > 0:
current = merged_counter.get(track, 0)
while current < remote_count:
merged.append(track)
merged_counter[track] += 1
current += 1
# 2) For tracks only new in Local+Remote (not in Base), keep Local's count.
candidate_tracks = set(local_counter.keys()) | set(remote_counter.keys())
for track in candidate_tracks:
if base_counter[track] == 0 and local_counter[track] > 0 and remote_counter[track] > 0:
target = local_counter[track]
current = merged_counter.get(track, 0)
if current > target:
remove_needed = current - target
trimmed: list[str] = []
for path in reversed(merged):
if remove_needed and path == track:
remove_needed -= 1
continue
trimmed.append(path)
trimmed.reverse()
merged = trimmed
merged_counter = Counter(merged)
current = merged_counter.get(track, 0)
while current < target:
merged.append(track)
current += 1
merged_counter[track] = current
return merged
def _postprocess_remote_priority_merge(
merged_lines: list[str],
base_paths: Sequence[str],
local_paths: Sequence[str],
remote_paths: Sequence[str],
) -> list[str]:
base_counter = Counter(base_paths)
local_counter = Counter(local_paths)
remote_counter = Counter(remote_paths)
remote_base_tracks: list[str] = [
track for track in remote_paths if base_counter[track] > 0
]
target_new_counter: dict[str, int] = {}
all_tracks = set(local_counter.keys()) | set(remote_counter.keys())
for track in all_tracks:
if base_counter[track] > 0:
continue
r_cnt = remote_counter.get(track, 0)
l_cnt = local_counter.get(track, 0)
if r_cnt > 0:
target_new_counter[track] = r_cnt
elif l_cnt > 0:
target_new_counter[track] = l_cnt
new_tracks: list[str] = []
used_counter: Counter[str] = Counter()
for track in remote_paths:
if base_counter[track] > 0:
continue
if track not in target_new_counter:
continue
if used_counter[track] >= target_new_counter[track]:
continue
new_tracks.append(track)
used_counter[track] += 1
for track in local_paths:
if base_counter[track] > 0:
continue
if track not in target_new_counter:
continue
if used_counter[track] >= target_new_counter[track]:
continue
new_tracks.append(track)
used_counter[track] += 1
return remote_base_tracks + new_tracks
def merge_playlists(
base_text: str,
local_text: str,
remote_text: str,
strategy: ConflictResolutionStrategy = ConflictResolutionStrategy.LOCAL_PRIORITY,
test_folder: str = TEST_PLAYLIST_DIR,
) -> MergeResult:
"""Merge playlists using diff3 and resolve conflicts per strategy.
The base, local, and remote normalized playlists are saved into ``test_folder``
for inspection. The merged playlist is also stored twice to simulate the
versions intended for local save and cloud upload.
"""
base_paths, local_paths, remote_paths = _normalize_inputs(
base_text, local_text, remote_text, test_folder
)
chunks = _merge_chunks(base_paths, local_paths, remote_paths)
has_conflict = any(chunk.type == "conflict" for chunk in chunks)
merged_lines: list[str] = []
conflicts: list[dict] = []
for chunk in chunks:
if chunk.type == "normal":
merged_lines.extend(chunk.text.splitlines())
else:
conflict_info = {
"base": chunk.base_text.splitlines(),
"local": chunk.local_text.splitlines(),
"remote": chunk.remote_text.splitlines(),
}
conflicts.append(conflict_info)
resolved = _resolve_conflict(conflict_info, strategy)
merged_lines.extend(resolved)
if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY:
merged_lines = _postprocess_local_priority_merge(
merged_lines, base_paths, local_paths, remote_paths
)
if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY:
merged_lines = _postprocess_remote_priority_merge(
merged_lines, base_paths, local_paths, remote_paths
)
_write_results(merged_lines, test_folder)
return MergeResult(merged_paths=merged_lines, conflicts=conflicts)
def _load_local_playlists(local_dir: str) -> dict[str, str]:
"""Read all local playlists under ``local_dir`` keyed by stem name."""
playlists: dict[str, str] = {}
if not local_dir:
return playlists
absolute = os.path.abspath(local_dir)
if not os.path.isdir(absolute):
return playlists
for entry in os.scandir(absolute):
if entry.is_file() and entry.name.lower().endswith((".m3u", ".m3u8")):
stem, _ = os.path.splitext(entry.name)
try:
with open(entry.path, "r", encoding="utf-8") as file:
playlists[stem] = file.read()
except OSError:
playlists[stem] = ""
return playlists
def _load_playlist_snapshots(playlist: str, folder: str) -> tuple[str, str, str, bool, bool]:
"""Load base/local/remote texts for a playlist from its test folder."""
playlist_folder = os.path.join(folder, playlist)
base_text, base_exists = _read_text_if_exists(
os.path.join(playlist_folder, "base_next.m3u8")
)
if not base_text:
alt_text, _ = _read_text_if_exists(
os.path.join(playlist_folder, "base_playlist.m3u8")
)
base_text = base_text or alt_text
remote_text, remote_exists = _read_text_if_exists(
os.path.join(playlist_folder, "remote_input.m3u8")
)
return base_text, remote_text, playlist_folder, remote_exists, base_exists
def _fetch_remote_playlists() -> dict[str, str]:
"""Retrieve remote playlists from Plex when connected.
Returns a mapping of playlist name to serialized playlist text. Failures to
connect or fetch playlists are logged and result in an empty mapping to
avoid blocking local testing.
"""
server_config.load()
if not server_config.url:
return {}
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
)
except Exception as exc: # pragma: no cover - network access
logger.warning(f"Failed to connect to Plex for remote playlists: {exc}")
return {}
playlists: dict[str, str] = {}
try:
for playlist in plex_client.server.playlists():
paths: list[str] = []
try:
for track in playlist.items():
locations = getattr(track, "locations", None) or []
if locations:
paths.append(locations[0])
except Exception as exc: # pragma: no cover - plex runtime
logger.warning(f"Failed to read playlist '{playlist.title}': {exc}")
playlists[playlist.title] = save_paths(paths)
except Exception as exc: # pragma: no cover - plex runtime
logger.warning(f"Failed to enumerate remote playlists: {exc}")
return {}
return playlists
def _sync_single_playlist(
playlist: str,
mode: SyncMode,
local_text: str | None,
base_text: str,
remote_text: str,
playlist_folder: str,
remote_present: bool,
) -> PlaylistSyncResult:
local_present = local_text is not None
local_text = local_text or ""
if not base_text:
base_text = "#EXTM3U\n"
if not remote_text:
remote_text = "#EXTM3U\n"
if mode == SyncMode.LOCAL_FORCE:
if not local_present:
_write_delete_marker(playlist, playlist_folder)
return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder)
base_paths, local_paths, _ = _normalize_inputs(
base_text, local_text, remote_text, playlist_folder
)
merged_lines = list(local_paths)
_write_results(merged_lines, playlist_folder)
return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder)
if mode == SyncMode.REMOTE_FORCE:
if not remote_present:
_write_delete_marker(playlist, playlist_folder)
return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder)
base_paths, _, remote_paths = _normalize_inputs(
base_text, local_text, remote_text, playlist_folder
)
merged_lines = list(remote_paths)
_write_results(merged_lines, playlist_folder)
return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder)
if mode not in (SyncMode.MERGE_LOCAL_PRIMARY, SyncMode.MERGE_REMOTE_PRIMARY):
raise ValueError(f"Unsupported sync mode: {mode}")
merge_strategy = (
ConflictResolutionStrategy.LOCAL_PRIORITY
if mode == SyncMode.MERGE_LOCAL_PRIMARY
else ConflictResolutionStrategy.REMOTE_PRIORITY
)
merge_result = merge_playlists(
base_text=base_text,
local_text=local_text,
remote_text=remote_text,
strategy=merge_strategy,
test_folder=playlist_folder,
)
if not merge_result.merged_paths and (not local_present or not remote_present):
_write_delete_marker(playlist, playlist_folder)
return PlaylistSyncResult(
playlist, merge_result.merged_paths, merge_result.conflicts, "deleted", playlist_folder
)
return PlaylistSyncResult(
playlist, merge_result.merged_paths, merge_result.conflicts, "synced", playlist_folder
)
def sync_all_playlists(
local_dir: str, mode: SyncMode, test_folder: str = TEST_PLAYLIST_DIR
) -> list[PlaylistSyncResult]:
"""Synchronize all playlists that can be matched by name."""
server_config.load()
compiled_rules = _compile_regex_rules(server_config.path_rules)
_ensure_test_dir(test_folder)
local_playlists = _load_local_playlists(local_dir)
remote_playlists = _fetch_remote_playlists()
playlist_names: set[str] = set(local_playlists.keys())
playlist_names.update(remote_playlists.keys())
for entry in os.scandir(test_folder):
if entry.is_dir():
playlist_names.add(entry.name)
results: list[PlaylistSyncResult] = []
for playlist in sorted(playlist_names):
base_text, snapshot_remote_text, playlist_folder, remote_exists, _ = _load_playlist_snapshots(
playlist, test_folder
)
local_text = local_playlists.get(playlist)
remote_text = remote_playlists.get(playlist)
remote_present = False
if remote_text:
remote_present = True
else:
remote_text = snapshot_remote_text
remote_present = bool(remote_text.strip()) or remote_exists
base_text = preprocess_playlist_text(
base_text, server_config.path_rules, compiled_rules
)
remote_text = preprocess_playlist_text(
remote_text, server_config.path_rules, compiled_rules
)
if local_text is not None:
local_text = preprocess_playlist_text(
local_text, server_config.path_rules, compiled_rules
)
# Treat missing remote text as absent playlist.
result = _sync_single_playlist(
playlist=playlist,
mode=mode,
local_text=local_text,
base_text=base_text,
remote_text=remote_text,
playlist_folder=playlist_folder,
remote_present=remote_present,
)
results.append(result)
return results