920 lines
33 KiB
Python
920 lines
33 KiB
Python
import os
|
|
import re
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Literal, Sequence
|
|
|
|
from app.utils.config import server_config
|
|
from app.utils.logger import logger
|
|
from app.utils.plex_client import plex_client
|
|
|
|
from merge3 import Merge3
|
|
|
|
|
|
SYNC_ARTIFACTS_DIR = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..", "data", "sync_artifacts")
|
|
)
|
|
|
|
# Backward-compat alias (older API / logs used this name).
|
|
TEST_PLAYLIST_DIR = SYNC_ARTIFACTS_DIR
|
|
|
|
|
|
class ConflictResolutionStrategy(str, Enum):
|
|
LOCAL_PRIORITY = "local_priority"
|
|
REMOTE_PRIORITY = "remote_priority"
|
|
DELETE_PRIORITY = "delete_priority"
|
|
KEEP_PRIORITY = "keep_priority"
|
|
|
|
|
|
class SyncMode(str, Enum):
|
|
LOCAL_FORCE = "local_force"
|
|
REMOTE_FORCE = "remote_force"
|
|
MERGE_LOCAL_PRIMARY = "merge_local_primary"
|
|
MERGE_REMOTE_PRIMARY = "merge_remote_primary"
|
|
|
|
|
|
@dataclass
|
|
class PlaylistSyncResult:
|
|
name: str
|
|
merged_paths: list[str]
|
|
conflicts: list[dict]
|
|
action: Literal["synced", "deleted"]
|
|
output_dir: str
|
|
|
|
|
|
@dataclass
|
|
class CompiledRegexRules:
|
|
"""Holds compiled regex rules for all four processing stages."""
|
|
local_pre: list[tuple[re.Pattern[str], str]]
|
|
local_post: list[tuple[re.Pattern[str], str]]
|
|
remote_pre: list[tuple[re.Pattern[str], str]]
|
|
remote_post: list[tuple[re.Pattern[str], str]]
|
|
|
|
|
|
def load_paths(text: str) -> list[str]:
|
|
"""Normalize playlist text into a list of absolute paths.
|
|
|
|
* Remove blank lines.
|
|
* Ignore comment or metadata lines starting with ``#``.
|
|
* Preserve order and allow duplicates.
|
|
"""
|
|
|
|
output: list[str] = []
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line.startswith("#"):
|
|
continue
|
|
output.append(line)
|
|
return output
|
|
|
|
|
|
def playlist_to_text(paths: Sequence[str]) -> str:
|
|
"""Convert list of paths into diff3-friendly text."""
|
|
|
|
return "\n".join(paths) + "\n"
|
|
|
|
|
|
def save_paths(paths: Sequence[str]) -> str:
|
|
"""Format playlist paths into m3u/m3u8 text."""
|
|
|
|
return "#EXTM3U\n" + "\n".join(paths) + "\n"
|
|
|
|
|
|
def _compile_regex_rules(rules: Sequence[dict[str, str]]) -> list[tuple[re.Pattern[str], str]]:
|
|
"""Compile regex rules into pattern/replacement pairs.
|
|
|
|
Supports both legacy format (pattern/replacement) and new format (search/replace).
|
|
"""
|
|
compiled: list[tuple[re.Pattern[str], str]] = []
|
|
for rule in rules:
|
|
# Support both legacy (pattern/replacement) and new (search/replace) field names
|
|
# Use explicit None checks to allow empty strings as valid values
|
|
pattern = rule.get("pattern") if rule.get("pattern") is not None else rule.get("search")
|
|
if not pattern:
|
|
continue
|
|
# For replacement, empty string is a valid value (for deletion)
|
|
replacement = rule.get("replacement") if rule.get("replacement") is not None else rule.get("replace")
|
|
if replacement is None:
|
|
replacement = ""
|
|
try:
|
|
compiled.append((re.compile(pattern), replacement))
|
|
except re.error as exc:
|
|
logger.warning(f"Skipping invalid regex '{pattern}': {exc}")
|
|
return compiled
|
|
|
|
|
|
def _apply_compiled_rules_to_paths(
|
|
paths: Sequence[str], compiled_rules: Sequence[tuple[re.Pattern[str], str]]
|
|
) -> list[str]:
|
|
if not compiled_rules:
|
|
return list(paths)
|
|
|
|
updated: list[str] = []
|
|
for path in paths:
|
|
new_path = path
|
|
for pattern, replacement in compiled_rules:
|
|
new_path = pattern.sub(replacement, new_path)
|
|
updated.append(new_path)
|
|
return updated
|
|
|
|
|
|
def apply_regex_rules_to_paths(
|
|
paths: Sequence[str], rules: Sequence[dict[str, str]]
|
|
) -> list[str]:
|
|
"""Apply regex replacement rules to each path in order."""
|
|
|
|
compiled_rules = _compile_regex_rules(rules)
|
|
return _apply_compiled_rules_to_paths(paths, compiled_rules)
|
|
|
|
|
|
def preprocess_playlist_text(
|
|
text: str,
|
|
rules: Sequence[dict[str, str]],
|
|
compiled_rules: Sequence[tuple[re.Pattern[str], str]] | None = None,
|
|
) -> str:
|
|
"""Normalize playlist text and apply regex replacements."""
|
|
|
|
if not text:
|
|
text = "#EXTM3U\n"
|
|
|
|
paths = load_paths(text)
|
|
compiled = compiled_rules if compiled_rules is not None else _compile_regex_rules(rules)
|
|
replaced_paths = _apply_compiled_rules_to_paths(paths, compiled)
|
|
return save_paths(replaced_paths)
|
|
|
|
|
|
@dataclass
|
|
class MergeChunk:
|
|
type: Literal["normal", "conflict"]
|
|
text: str | None = None
|
|
base_text: str | None = None
|
|
local_text: str | None = None
|
|
remote_text: str | None = None
|
|
origin: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class MergeResult:
|
|
merged_paths: list[str]
|
|
conflicts: list[dict]
|
|
|
|
|
|
def _ensure_dir(path: str) -> str:
|
|
os.makedirs(path, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _safe_folder_name(name: str, max_len: int = 120) -> str:
|
|
"""Make a filesystem-safe folder name (especially for Windows hosts).
|
|
|
|
This is used for artifacts folders persisted to the host.
|
|
"""
|
|
|
|
if not name:
|
|
return "(unnamed)"
|
|
|
|
# Windows-disallowed characters plus path separators.
|
|
invalid = set('<>:"/\\|?*')
|
|
cleaned = "".join(("_" if ch in invalid else ch) for ch in name).strip()
|
|
if not cleaned:
|
|
cleaned = "(unnamed)"
|
|
if len(cleaned) > max_len:
|
|
cleaned = cleaned[:max_len].rstrip()
|
|
return cleaned
|
|
|
|
|
|
def _playlist_artifact_dir(artifacts_root: str, playlist_name: str) -> str:
|
|
return os.path.join(artifacts_root, _safe_folder_name(playlist_name))
|
|
|
|
|
|
def _artifact_file(playlist_folder: str, category: str, filename: str) -> str:
|
|
category_dir = _ensure_dir(os.path.join(playlist_folder, category))
|
|
return os.path.join(category_dir, filename)
|
|
|
|
|
|
def _read_text_if_exists(path: str) -> tuple[str, bool]:
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as file:
|
|
return file.read(), True
|
|
except FileNotFoundError:
|
|
return "", False
|
|
except OSError:
|
|
return "", False
|
|
|
|
|
|
def _save_playlist_text(path: str, text: str) -> str:
|
|
"""Write text if changed (avoid triggering unnecessary file events)."""
|
|
|
|
_ensure_dir(os.path.dirname(path))
|
|
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as file:
|
|
if file.read() == text:
|
|
return path
|
|
except OSError:
|
|
pass
|
|
|
|
with open(path, "w", encoding="utf-8") as file:
|
|
file.write(text)
|
|
return path
|
|
|
|
|
|
def _save_playlist_paths(path: str, paths: Sequence[str]) -> str:
|
|
logger.info(f"Saving playlist to: {path}")
|
|
return _save_playlist_text(path, save_paths(paths))
|
|
|
|
|
|
def _normalize_inputs(
|
|
base_text: str, local_text: str, remote_text: str, folder: str
|
|
) -> tuple[list[str], list[str], list[str]]:
|
|
"""Normalize playlist inputs and persist snapshots for inspection."""
|
|
|
|
base_paths = load_paths(base_text)
|
|
local_paths = load_paths(local_text)
|
|
remote_paths = load_paths(remote_text)
|
|
|
|
_save_playlist_paths(_artifact_file(folder, "base", "base_prev.m3u8"), base_paths)
|
|
_save_playlist_paths(_artifact_file(folder, "inputs", "local_input.m3u8"), local_paths)
|
|
_save_playlist_paths(_artifact_file(folder, "inputs", "remote_input.m3u8"), remote_paths)
|
|
|
|
return base_paths, local_paths, remote_paths
|
|
|
|
|
|
def _merge_chunks(
|
|
base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str]
|
|
) -> list[MergeChunk]:
|
|
merger = Merge3(base_paths, local_paths, remote_paths)
|
|
raw_chunks = list(merger.merge_groups())
|
|
chunks: list[MergeChunk] = []
|
|
|
|
for kind, *payload in raw_chunks:
|
|
if kind == "conflict":
|
|
base_lines, local_lines, remote_lines = payload # type: ignore[misc]
|
|
|
|
# Absorb trailing side-only chunks that actually belong to this
|
|
# conflicting region (common in reorder scenarios).
|
|
while chunks and chunks[-1].origin in ("a", "b"):
|
|
previous = chunks.pop()
|
|
previous_lines = previous.text.splitlines() if previous.text else []
|
|
|
|
if previous.origin == "a":
|
|
local_lines = previous_lines + local_lines
|
|
else:
|
|
remote_lines = previous_lines + remote_lines
|
|
|
|
chunks.append(
|
|
MergeChunk(
|
|
type="conflict",
|
|
base_text=playlist_to_text(base_lines),
|
|
local_text=playlist_to_text(local_lines),
|
|
remote_text=playlist_to_text(remote_lines),
|
|
)
|
|
)
|
|
else:
|
|
lines = payload[0]
|
|
if not lines:
|
|
continue
|
|
chunk = MergeChunk(type="normal", text=playlist_to_text(lines), origin=kind)
|
|
chunks.append(chunk)
|
|
|
|
return chunks
|
|
|
|
|
|
def _write_results(
|
|
merged_lines: Sequence[str],
|
|
folder: str,
|
|
compiled_rules: CompiledRegexRules | None = None
|
|
) -> None:
|
|
"""Write sync results to the test folder.
|
|
|
|
If compiled_rules is provided with post-processing rules:
|
|
- local_result.m3u8: merged_lines processed with local_post rules
|
|
- remote_result.m3u8: merged_lines processed with remote_post rules
|
|
- base_next.m3u8: unprocessed merged_lines (normalized sync result)
|
|
"""
|
|
# Apply post-processing regex rules if provided
|
|
if compiled_rules and compiled_rules.local_post:
|
|
local_lines = _apply_compiled_rules_to_paths(merged_lines, compiled_rules.local_post)
|
|
else:
|
|
local_lines = list(merged_lines)
|
|
|
|
if compiled_rules and compiled_rules.remote_post:
|
|
remote_lines = _apply_compiled_rules_to_paths(merged_lines, compiled_rules.remote_post)
|
|
else:
|
|
remote_lines = list(merged_lines)
|
|
|
|
_save_playlist_paths(_artifact_file(folder, "outputs", "local_result.m3u8"), local_lines)
|
|
_save_playlist_paths(_artifact_file(folder, "outputs", "remote_result.m3u8"), remote_lines)
|
|
_save_playlist_paths(_artifact_file(folder, "base", "base_next.m3u8"), merged_lines)
|
|
|
|
|
|
def _write_delete_marker(playlist: str, folder: str) -> str:
|
|
marker_path = _artifact_file(folder, "meta", "delete.txt")
|
|
with open(marker_path, "w", encoding="utf-8") as file:
|
|
file.write(f"delete playlist {playlist}")
|
|
return marker_path
|
|
|
|
|
|
def _resolve_conflict(conflict: dict, strategy: ConflictResolutionStrategy) -> list[str]:
|
|
local_lines = conflict["local"]
|
|
remote_lines = conflict["remote"]
|
|
|
|
if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY:
|
|
return local_lines
|
|
if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY:
|
|
return remote_lines
|
|
if strategy == ConflictResolutionStrategy.DELETE_PRIORITY:
|
|
if not local_lines or not remote_lines:
|
|
return []
|
|
return local_lines
|
|
if strategy == ConflictResolutionStrategy.KEEP_PRIORITY:
|
|
if local_lines:
|
|
return local_lines
|
|
if remote_lines:
|
|
return remote_lines
|
|
return []
|
|
|
|
raise ValueError(f"Unsupported conflict resolution strategy: {strategy}")
|
|
|
|
|
|
def _postprocess_local_priority_merge(
|
|
merged_lines: list[str],
|
|
base_paths: Sequence[str],
|
|
local_paths: Sequence[str],
|
|
remote_paths: Sequence[str],
|
|
) -> list[str]:
|
|
"""Fix local-priority edge cases without disturbing existing order."""
|
|
|
|
base_counter = Counter(base_paths)
|
|
local_counter = Counter(local_paths)
|
|
remote_counter = Counter(remote_paths)
|
|
merged_counter = Counter(merged_lines)
|
|
merged = list(merged_lines)
|
|
|
|
# 1) Preserve tracks that only Remote added (not in Base or Local).
|
|
for track, remote_count in remote_counter.items():
|
|
if base_counter[track] == 0 and local_counter[track] == 0 and remote_count > 0:
|
|
current = merged_counter.get(track, 0)
|
|
while current < remote_count:
|
|
merged.append(track)
|
|
merged_counter[track] += 1
|
|
current += 1
|
|
|
|
# 2) For tracks only new in Local+Remote (not in Base), keep Local's count.
|
|
candidate_tracks = set(local_counter.keys()) | set(remote_counter.keys())
|
|
for track in candidate_tracks:
|
|
if base_counter[track] == 0 and local_counter[track] > 0 and remote_counter[track] > 0:
|
|
target = local_counter[track]
|
|
current = merged_counter.get(track, 0)
|
|
|
|
if current > target:
|
|
remove_needed = current - target
|
|
trimmed: list[str] = []
|
|
for path in reversed(merged):
|
|
if remove_needed and path == track:
|
|
remove_needed -= 1
|
|
continue
|
|
trimmed.append(path)
|
|
trimmed.reverse()
|
|
merged = trimmed
|
|
merged_counter = Counter(merged)
|
|
current = merged_counter.get(track, 0)
|
|
|
|
while current < target:
|
|
merged.append(track)
|
|
current += 1
|
|
merged_counter[track] = current
|
|
|
|
return merged
|
|
|
|
def _postprocess_remote_priority_merge(
|
|
merged_lines: list[str],
|
|
base_paths: Sequence[str],
|
|
local_paths: Sequence[str],
|
|
remote_paths: Sequence[str],
|
|
) -> list[str]:
|
|
|
|
base_counter = Counter(base_paths)
|
|
local_counter = Counter(local_paths)
|
|
remote_counter = Counter(remote_paths)
|
|
|
|
remote_base_tracks: list[str] = [
|
|
track for track in remote_paths if base_counter[track] > 0
|
|
]
|
|
|
|
target_new_counter: dict[str, int] = {}
|
|
all_tracks = set(local_counter.keys()) | set(remote_counter.keys())
|
|
for track in all_tracks:
|
|
if base_counter[track] > 0:
|
|
continue
|
|
r_cnt = remote_counter.get(track, 0)
|
|
l_cnt = local_counter.get(track, 0)
|
|
if r_cnt > 0:
|
|
target_new_counter[track] = r_cnt
|
|
elif l_cnt > 0:
|
|
target_new_counter[track] = l_cnt
|
|
|
|
new_tracks: list[str] = []
|
|
used_counter: Counter[str] = Counter()
|
|
|
|
for track in remote_paths:
|
|
if base_counter[track] > 0:
|
|
continue
|
|
if track not in target_new_counter:
|
|
continue
|
|
if used_counter[track] >= target_new_counter[track]:
|
|
continue
|
|
new_tracks.append(track)
|
|
used_counter[track] += 1
|
|
|
|
for track in local_paths:
|
|
if base_counter[track] > 0:
|
|
continue
|
|
if track not in target_new_counter:
|
|
continue
|
|
if used_counter[track] >= target_new_counter[track]:
|
|
continue
|
|
new_tracks.append(track)
|
|
used_counter[track] += 1
|
|
|
|
return remote_base_tracks + new_tracks
|
|
|
|
|
|
def merge_playlists(
|
|
base_text: str,
|
|
local_text: str,
|
|
remote_text: str,
|
|
strategy: ConflictResolutionStrategy = ConflictResolutionStrategy.LOCAL_PRIORITY,
|
|
test_folder: str = SYNC_ARTIFACTS_DIR,
|
|
compiled_rules: CompiledRegexRules | None = None,
|
|
) -> MergeResult:
|
|
"""Merge playlists using diff3 and resolve conflicts per strategy.
|
|
|
|
The base, local, and remote normalized playlists are saved into ``test_folder``
|
|
for inspection. The merged playlist is also stored twice to simulate the
|
|
versions intended for local save and cloud upload.
|
|
|
|
If compiled_rules is provided, post-processing regex rules will be applied
|
|
to the results before writing.
|
|
"""
|
|
|
|
base_paths, local_paths, remote_paths = _normalize_inputs(
|
|
base_text, local_text, remote_text, test_folder
|
|
)
|
|
|
|
chunks = _merge_chunks(base_paths, local_paths, remote_paths)
|
|
has_conflict = any(chunk.type == "conflict" for chunk in chunks)
|
|
|
|
merged_lines: list[str] = []
|
|
conflicts: list[dict] = []
|
|
|
|
for chunk in chunks:
|
|
if chunk.type == "normal":
|
|
merged_lines.extend(chunk.text.splitlines())
|
|
else:
|
|
conflict_info = {
|
|
"base": chunk.base_text.splitlines(),
|
|
"local": chunk.local_text.splitlines(),
|
|
"remote": chunk.remote_text.splitlines(),
|
|
}
|
|
conflicts.append(conflict_info)
|
|
resolved = _resolve_conflict(conflict_info, strategy)
|
|
merged_lines.extend(resolved)
|
|
|
|
if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY:
|
|
merged_lines = _postprocess_local_priority_merge(
|
|
merged_lines, base_paths, local_paths, remote_paths
|
|
)
|
|
|
|
if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY:
|
|
merged_lines = _postprocess_remote_priority_merge(
|
|
merged_lines, base_paths, local_paths, remote_paths
|
|
)
|
|
|
|
_write_results(merged_lines, test_folder, compiled_rules)
|
|
|
|
return MergeResult(merged_paths=merged_lines, conflicts=conflicts)
|
|
|
|
|
|
def _load_local_playlists(local_dir: str) -> dict[str, str]:
|
|
"""Read all local playlists under ``local_dir`` keyed by stem name."""
|
|
|
|
playlists: dict[str, str] = {}
|
|
if not local_dir:
|
|
return playlists
|
|
|
|
absolute = os.path.abspath(local_dir)
|
|
if not os.path.isdir(absolute):
|
|
return playlists
|
|
|
|
for entry in os.scandir(absolute):
|
|
if entry.is_file() and entry.name.lower().endswith((".m3u", ".m3u8")):
|
|
stem, _ = os.path.splitext(entry.name)
|
|
try:
|
|
with open(entry.path, "r", encoding="utf-8") as file:
|
|
playlists[stem] = file.read()
|
|
except OSError:
|
|
playlists[stem] = ""
|
|
return playlists
|
|
|
|
|
|
def _load_playlist_snapshots(playlist: str, folder: str) -> tuple[str, str, str, bool, bool]:
|
|
"""Load base/local/remote texts for a playlist from its test folder."""
|
|
|
|
playlist_folder = _playlist_artifact_dir(folder, playlist)
|
|
|
|
# Prefer new organized layout.
|
|
base_text, base_exists = _read_text_if_exists(
|
|
os.path.join(playlist_folder, "base", "base_next.m3u8")
|
|
)
|
|
if not base_text:
|
|
alt_text, alt_exists = _read_text_if_exists(
|
|
os.path.join(playlist_folder, "base", "base_prev.m3u8")
|
|
)
|
|
base_text = base_text or alt_text
|
|
base_exists = base_exists or alt_exists
|
|
|
|
# Backward-compat: legacy flat file layout.
|
|
if not base_text:
|
|
legacy_text, legacy_exists = _read_text_if_exists(
|
|
os.path.join(playlist_folder, "base_next.m3u8")
|
|
)
|
|
base_text = base_text or legacy_text
|
|
base_exists = base_exists or legacy_exists
|
|
if not base_text:
|
|
legacy_text, legacy_exists = _read_text_if_exists(
|
|
os.path.join(playlist_folder, "base_playlist.m3u8")
|
|
)
|
|
base_text = base_text or legacy_text
|
|
base_exists = base_exists or legacy_exists
|
|
|
|
remote_text, remote_exists = _read_text_if_exists(
|
|
os.path.join(playlist_folder, "inputs", "remote_input.m3u8")
|
|
)
|
|
if not remote_text:
|
|
legacy_remote, legacy_exists = _read_text_if_exists(
|
|
os.path.join(playlist_folder, "remote_input.m3u8")
|
|
)
|
|
remote_text = remote_text or legacy_remote
|
|
remote_exists = remote_exists or legacy_exists
|
|
|
|
return base_text, remote_text, playlist_folder, remote_exists, base_exists
|
|
|
|
|
|
def _fetch_remote_playlists() -> dict[str, str]:
|
|
"""Retrieve remote playlists from Plex when connected.
|
|
|
|
Returns a mapping of playlist name to serialized playlist text. Failures to
|
|
connect or fetch playlists are logged and result in an empty mapping to
|
|
avoid blocking local testing.
|
|
"""
|
|
|
|
server_config.load()
|
|
if not server_config.url:
|
|
return {}
|
|
|
|
try:
|
|
plex_client.connect(
|
|
token=server_config.token,
|
|
scheme=server_config.scheme,
|
|
url=server_config.url,
|
|
port=server_config.port,
|
|
)
|
|
except Exception as exc: # pragma: no cover - network access
|
|
logger.warning(f"Failed to connect to Plex for remote playlists: {exc}")
|
|
return {}
|
|
|
|
playlists: dict[str, str] = {}
|
|
try:
|
|
for playlist in plex_client.server.playlists():
|
|
paths: list[str] = []
|
|
try:
|
|
for track in playlist.items():
|
|
locations = getattr(track, "locations", None) or []
|
|
if locations:
|
|
paths.append(locations[0])
|
|
except Exception as exc: # pragma: no cover - plex runtime
|
|
logger.warning(f"Failed to read playlist '{playlist.title}': {exc}")
|
|
playlists[playlist.title] = save_paths(paths)
|
|
except Exception as exc: # pragma: no cover - plex runtime
|
|
logger.warning(f"Failed to enumerate remote playlists: {exc}")
|
|
return {}
|
|
|
|
return playlists
|
|
|
|
|
|
def _sync_single_playlist(
|
|
playlist: str,
|
|
mode: SyncMode,
|
|
local_text: str | None,
|
|
base_text: str,
|
|
remote_text: str,
|
|
playlist_folder: str,
|
|
remote_present: bool,
|
|
compiled_rules: CompiledRegexRules | None = None,
|
|
) -> PlaylistSyncResult:
|
|
local_present = local_text is not None
|
|
local_text = local_text or ""
|
|
|
|
if not base_text:
|
|
base_text = "#EXTM3U\n"
|
|
if not remote_text:
|
|
remote_text = "#EXTM3U\n"
|
|
|
|
if mode == SyncMode.LOCAL_FORCE:
|
|
if not local_present:
|
|
_write_delete_marker(playlist, playlist_folder)
|
|
return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder)
|
|
|
|
base_paths, local_paths, _ = _normalize_inputs(
|
|
base_text, local_text, remote_text, playlist_folder
|
|
)
|
|
merged_lines = list(local_paths)
|
|
_write_results(merged_lines, playlist_folder, compiled_rules)
|
|
return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder)
|
|
|
|
if mode == SyncMode.REMOTE_FORCE:
|
|
if not remote_present:
|
|
_write_delete_marker(playlist, playlist_folder)
|
|
return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder)
|
|
|
|
base_paths, _, remote_paths = _normalize_inputs(
|
|
base_text, local_text, remote_text, playlist_folder
|
|
)
|
|
merged_lines = list(remote_paths)
|
|
_write_results(merged_lines, playlist_folder, compiled_rules)
|
|
return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder)
|
|
|
|
if mode not in (SyncMode.MERGE_LOCAL_PRIMARY, SyncMode.MERGE_REMOTE_PRIMARY):
|
|
raise ValueError(f"Unsupported sync mode: {mode}")
|
|
|
|
merge_strategy = (
|
|
ConflictResolutionStrategy.LOCAL_PRIORITY
|
|
if mode == SyncMode.MERGE_LOCAL_PRIMARY
|
|
else ConflictResolutionStrategy.REMOTE_PRIORITY
|
|
)
|
|
|
|
merge_result = merge_playlists(
|
|
base_text=base_text,
|
|
local_text=local_text,
|
|
remote_text=remote_text,
|
|
strategy=merge_strategy,
|
|
test_folder=playlist_folder,
|
|
compiled_rules=compiled_rules,
|
|
)
|
|
|
|
if not merge_result.merged_paths and (not local_present or not remote_present):
|
|
_write_delete_marker(playlist, playlist_folder)
|
|
return PlaylistSyncResult(
|
|
playlist, merge_result.merged_paths, merge_result.conflicts, "deleted", playlist_folder
|
|
)
|
|
|
|
return PlaylistSyncResult(
|
|
playlist, merge_result.merged_paths, merge_result.conflicts, "synced", playlist_folder
|
|
)
|
|
|
|
|
|
def _compile_path_mapping_rules(path_mapping: dict) -> CompiledRegexRules:
|
|
"""Compile regex rules from path_mapping config for all four processing stages."""
|
|
regex_config = path_mapping.get("regex", {})
|
|
return CompiledRegexRules(
|
|
local_pre=_compile_regex_rules(regex_config.get("local_pre", [])),
|
|
local_post=_compile_regex_rules(regex_config.get("local_post", [])),
|
|
remote_pre=_compile_regex_rules(regex_config.get("remote_pre", [])),
|
|
remote_post=_compile_regex_rules(regex_config.get("remote_post", [])),
|
|
)
|
|
|
|
|
|
def _compile_simple_mapping_rules(simple_mappings: list[dict]) -> CompiledRegexRules:
|
|
"""Compile simple mapping pairs into four rule groups using UUID-based mapping_ids.
|
|
|
|
Each simple mapping has:
|
|
- id: UUID used as the mapping_id (unique identifier to prevent conflicts)
|
|
- search: Local path prefix
|
|
- replace: Cloud path prefix
|
|
|
|
This generates four rule sets:
|
|
- local_pre: Replace local path (search) with mapping_id
|
|
- remote_pre: Replace cloud path (replace) with mapping_id
|
|
- local_post: Replace mapping_id with local path (search)
|
|
- remote_post: Replace mapping_id with cloud path (replace)
|
|
|
|
The mapping_id is wrapped with special markers to prevent conflicts with actual paths.
|
|
"""
|
|
local_pre_rules: list[dict[str, str]] = []
|
|
local_post_rules: list[dict[str, str]] = []
|
|
remote_pre_rules: list[dict[str, str]] = []
|
|
remote_post_rules: list[dict[str, str]] = []
|
|
|
|
# UUID pattern for validation (accepts standard UUID format with or without hyphens)
|
|
uuid_pattern = re.compile(r'^[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}$')
|
|
|
|
for mapping in simple_mappings:
|
|
# Get the mapping values
|
|
mapping_id = mapping.get("id")
|
|
local_path = mapping.get("search", "") # Local path is stored in 'search' field
|
|
cloud_path = mapping.get("replace", "") # Cloud path is stored in 'replace' field
|
|
|
|
# Validate mapping_id is a proper UUID to prevent injection attacks
|
|
if not mapping_id or not isinstance(mapping_id, str):
|
|
logger.warning(f"Skipping mapping with missing or invalid id: {mapping}")
|
|
continue
|
|
|
|
if not uuid_pattern.match(mapping_id):
|
|
logger.warning(f"Skipping mapping with non-UUID id format: {mapping_id}")
|
|
continue
|
|
|
|
# Paths must be non-empty strings
|
|
if not local_path or not isinstance(local_path, str):
|
|
logger.warning(f"Skipping mapping with missing local path: {mapping}")
|
|
continue
|
|
|
|
if not cloud_path or not isinstance(cloud_path, str):
|
|
logger.warning(f"Skipping mapping with missing cloud path: {mapping}")
|
|
continue
|
|
|
|
# Normalize Windows paths: Replace double backslashes with single backslashes
|
|
# This handles cases where users enter escaped paths like \\Koha9-Main\\Music
|
|
# when the actual playlist content uses \Koha9-Main\Music
|
|
original_local = local_path
|
|
original_cloud = cloud_path
|
|
local_path = local_path.replace("\\\\", "\\")
|
|
cloud_path = cloud_path.replace("\\\\", "\\")
|
|
|
|
if local_path != original_local or cloud_path != original_cloud:
|
|
logger.info(f"Normalized Windows paths:")
|
|
logger.info(f" Local: {repr(original_local)} -> {repr(local_path)}")
|
|
logger.info(f" Cloud: {repr(original_cloud)} -> {repr(cloud_path)}")
|
|
|
|
# Create a unique placeholder using the validated UUID
|
|
# Using special markers to prevent conflicts with actual paths
|
|
placeholder = f"__MAPPING__{mapping_id}__"
|
|
|
|
# Debug logging for path mapping
|
|
logger.debug(f"Simple mapping pair:")
|
|
logger.debug(f" Local path (search): {repr(local_path)}")
|
|
logger.debug(f" Cloud path (replace): {repr(cloud_path)}")
|
|
logger.debug(f" Placeholder: {placeholder}")
|
|
|
|
# Pre-processing rules (use re.escape to treat paths as literal strings)
|
|
# local_pre: Replace local path with placeholder
|
|
local_pattern = re.escape(local_path)
|
|
logger.debug(f" Local pre pattern: {repr(local_pattern)}")
|
|
local_pre_rules.append({
|
|
"pattern": local_pattern,
|
|
"replacement": placeholder
|
|
})
|
|
|
|
# remote_pre: Replace cloud path with placeholder
|
|
remote_pattern = re.escape(cloud_path)
|
|
logger.debug(f" Remote pre pattern: {repr(remote_pattern)}")
|
|
remote_pre_rules.append({
|
|
"pattern": remote_pattern,
|
|
"replacement": placeholder
|
|
})
|
|
|
|
# Post-processing rules
|
|
# local_post: Replace placeholder with local path
|
|
# Note: In regex replacement, backslashes need to be escaped
|
|
local_post_rules.append({
|
|
"pattern": re.escape(placeholder),
|
|
"replacement": local_path.replace("\\", "\\\\")
|
|
})
|
|
|
|
# remote_post: Replace placeholder with cloud path
|
|
remote_post_rules.append({
|
|
"pattern": re.escape(placeholder),
|
|
"replacement": cloud_path.replace("\\", "\\\\")
|
|
})
|
|
|
|
logger.info(f"Compiled {len(local_pre_rules)} simple mapping pairs into rules")
|
|
|
|
return CompiledRegexRules(
|
|
local_pre=_compile_regex_rules(local_pre_rules),
|
|
local_post=_compile_regex_rules(local_post_rules),
|
|
remote_pre=_compile_regex_rules(remote_pre_rules),
|
|
remote_post=_compile_regex_rules(remote_post_rules),
|
|
)
|
|
|
|
|
|
def sync_all_playlists(
|
|
local_dir: str, mode: SyncMode, test_folder: str = SYNC_ARTIFACTS_DIR
|
|
) -> list[PlaylistSyncResult]:
|
|
"""Synchronize all playlists that can be matched by name.
|
|
|
|
Path mapping modes:
|
|
- SIMPLE: Uses UUID-based mapping_ids to convert between local and cloud paths
|
|
- local_pre: local_path -> mapping_id
|
|
- remote_pre: cloud_path -> mapping_id
|
|
- local_post: mapping_id -> local_path
|
|
- remote_post: mapping_id -> cloud_path
|
|
|
|
- REGEX: Uses custom regex rules for each processing stage
|
|
- local_pre, local_post, remote_pre, remote_post rules are applied directly
|
|
|
|
Processing flow:
|
|
1. local_pre rules are applied to local playlists before sync
|
|
2. remote_pre rules are applied to remote playlists before sync
|
|
3. Sync/merge is performed
|
|
4. local_post rules are applied to results before writing to local_result.m3u8
|
|
5. remote_post rules are applied to results before writing to remote_result.m3u8
|
|
"""
|
|
|
|
server_config.load()
|
|
|
|
# Get path_mapping configuration
|
|
path_mapping = server_config.path_mapping
|
|
mapping_mode = path_mapping.get("mode", "SIMPLE")
|
|
|
|
# Compile rules based on the mode
|
|
compiled_rules: CompiledRegexRules | None = None
|
|
legacy_compiled_rules: list[tuple[re.Pattern[str], str]] = []
|
|
|
|
if mapping_mode == "REGEX":
|
|
compiled_rules = _compile_path_mapping_rules(path_mapping)
|
|
logger.info("Using REGEX mode for path mapping with 4 rule groups")
|
|
elif mapping_mode == "SIMPLE":
|
|
simple_mappings = path_mapping.get("simple", [])
|
|
if simple_mappings:
|
|
compiled_rules = _compile_simple_mapping_rules(simple_mappings)
|
|
logger.info(f"Using SIMPLE mode for path mapping with {len(simple_mappings)} mapping pairs")
|
|
else:
|
|
logger.info("SIMPLE mode with no mappings - no path transformations will be applied")
|
|
else:
|
|
# Use legacy path_rules for backward compatibility
|
|
legacy_compiled_rules = _compile_regex_rules(server_config.path_rules)
|
|
logger.info("Using legacy path_rules for preprocessing")
|
|
|
|
_ensure_dir(test_folder)
|
|
logger.info(f"Sync artifacts folder: {test_folder}")
|
|
local_playlists = _load_local_playlists(local_dir)
|
|
remote_playlists = _fetch_remote_playlists()
|
|
playlist_names: set[str] = set(local_playlists.keys())
|
|
|
|
playlist_names.update(remote_playlists.keys())
|
|
|
|
results: list[PlaylistSyncResult] = []
|
|
|
|
for playlist in sorted(playlist_names):
|
|
base_text, snapshot_remote_text, playlist_folder, remote_exists, _ = _load_playlist_snapshots(
|
|
playlist, test_folder
|
|
)
|
|
local_text = local_playlists.get(playlist)
|
|
|
|
remote_text = remote_playlists.get(playlist)
|
|
remote_present = False
|
|
if remote_text:
|
|
remote_present = True
|
|
else:
|
|
remote_text = snapshot_remote_text
|
|
remote_present = bool(remote_text.strip()) or remote_exists
|
|
|
|
if compiled_rules:
|
|
# Apply pre-processing rules for REGEX or SIMPLE mode
|
|
# base_text doesn't need pre-processing as it's the normalized state
|
|
if local_text is not None and compiled_rules.local_pre:
|
|
logger.debug(f"Applying local_pre rules to playlist: {playlist}")
|
|
logger.debug(f" Before preprocessing (first 200 chars): {repr(local_text[:200])}")
|
|
local_text = preprocess_playlist_text(
|
|
local_text, [], compiled_rules.local_pre
|
|
)
|
|
logger.debug(f" After preprocessing (first 200 chars): {repr(local_text[:200])}")
|
|
if remote_text and compiled_rules.remote_pre:
|
|
logger.debug(f"Applying remote_pre rules to playlist: {playlist}")
|
|
logger.debug(f" Before preprocessing (first 200 chars): {repr(remote_text[:200])}")
|
|
remote_text = preprocess_playlist_text(
|
|
remote_text, [], compiled_rules.remote_pre
|
|
)
|
|
logger.debug(f" After preprocessing (first 200 chars): {repr(remote_text[:200])}")
|
|
elif legacy_compiled_rules:
|
|
# Use legacy preprocessing for all texts
|
|
base_text = preprocess_playlist_text(
|
|
base_text, server_config.path_rules, legacy_compiled_rules
|
|
)
|
|
remote_text = preprocess_playlist_text(
|
|
remote_text, server_config.path_rules, legacy_compiled_rules
|
|
)
|
|
if local_text is not None:
|
|
local_text = preprocess_playlist_text(
|
|
local_text, server_config.path_rules, legacy_compiled_rules
|
|
)
|
|
|
|
# Treat missing remote text as absent playlist.
|
|
result = _sync_single_playlist(
|
|
playlist=playlist,
|
|
mode=mode,
|
|
local_text=local_text,
|
|
base_text=base_text,
|
|
remote_text=remote_text,
|
|
playlist_folder=playlist_folder,
|
|
remote_present=remote_present,
|
|
compiled_rules=compiled_rules,
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|