5 Commits

Author SHA1 Message Date
Koha9 5a52831ae8 Fix remote-priority merge postprocessing. 2025-11-25 10:50:37 +09:00
Koha9 0bb624f3c9 Adjust local-priority merge postprocessing 2025-11-25 10:08:49 +09:00
Koha9 97aa598775 Fix local-priority merge to avoid remote reorders in conflicts 2025-11-25 10:08:42 +09:00
Koha9 a912213e2e Load remote playlists from Plex during sync 2025-11-25 04:39:30 +09:00
Koha9 08eed569a9 Sync all playlists and track deletions 2025-11-24 22:21:01 +09:00
5 changed files with 722 additions and 17 deletions
+111 -17
View File
@@ -1,6 +1,7 @@
import os import os
from typing import Tuple from typing import Tuple
from app.utils.config import server_config from app.utils.config import server_config
from app.utils.playlist_merge import SyncMode, sync_all_playlists, TEST_PLAYLIST_DIR
from fastapi import FastAPI, Request, Form from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
@@ -23,6 +24,30 @@ app.mount(
) )
SYNC_MODE_OPTIONS = [
{
"value": SyncMode.LOCAL_FORCE.value,
"label": "完全本地优先(local_force",
"description": "单向同步,本地覆盖云端且顺序以本地为准。",
},
{
"value": SyncMode.REMOTE_FORCE.value,
"label": "完全云端优先(remote_force",
"description": "单向同步,云端覆盖本地且顺序以云端为准。",
},
{
"value": SyncMode.MERGE_LOCAL_PRIMARY.value,
"label": "双向合并(本地优先)",
"description": "三方合并,冲突时选择本地版本。",
},
{
"value": SyncMode.MERGE_REMOTE_PRIMARY.value,
"label": "双向合并(云端优先)",
"description": "三方合并,冲突时选择云端版本。",
},
]
def _get_cloud_playlists() -> Tuple[list[dict], str, dict]: def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
"""Fetch playlists and connection state from the remote Plex server.""" """Fetch playlists and connection state from the remote Plex server."""
@@ -74,27 +99,96 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
playlists.sort(key=lambda item: item["name"].lower()) playlists.sort(key=lambda item: item["name"].lower())
return playlists, status, server_info return playlists, status, server_info
def _build_home_context(
# 显示主页 request: Request,
@app.get("/", response_class=HTMLResponse) local_path: str,
async def home(request: Request, local_path: str = "playlist"): message: str | None = None,
message_type: str | None = None,
sync_result: dict | None = None,
selected_mode: str | None = None,
):
server_config.load() server_config.load()
local_playlists = scan_local_playlists(local_path) local_playlists = scan_local_playlists(local_path)
cloud_playlists, connection_status, server_info = _get_cloud_playlists() cloud_playlists, connection_status, server_info = _get_cloud_playlists()
return templates.TemplateResponse( return {
"home.html", "request": request,
{ "theme": server_config.theme,
"request": request, "path": "/",
"theme": server_config.theme, "local_playlists": local_playlists,
"path": "/", "local_path": local_path,
"local_playlists": local_playlists, "cloud_playlists": cloud_playlists,
"local_path": local_path, "connection_status": connection_status,
"cloud_playlists": cloud_playlists, "server_info": server_info,
"connection_status": connection_status, "sync_modes": SYNC_MODE_OPTIONS,
"server_info": server_info, "selected_mode": selected_mode,
}, "message": message,
) "message_type": message_type,
"sync_result": sync_result,
}
# 显示主页
@app.get("/", response_class=HTMLResponse)
async def home(request: Request, local_path: str = "playlist"):
context = _build_home_context(request, local_path)
return templates.TemplateResponse("home.html", context)
@app.post("/sync", response_class=HTMLResponse)
async def trigger_sync(request: Request, mode: str = Form(...), local_path: str = Form("playlist")):
try:
sync_mode = SyncMode(mode)
except ValueError:
context = _build_home_context(
request,
local_path,
message=f"未知的同步策略:{mode}",
message_type="danger",
selected_mode=mode,
)
return templates.TemplateResponse("home.html", context)
try:
results = sync_all_playlists(
local_dir=local_path,
mode=sync_mode,
test_folder=TEST_PLAYLIST_DIR,
)
merged_count = sum(len(item.merged_paths) for item in results)
conflict_count = sum(len(item.conflicts) for item in results)
deleted_count = sum(1 for item in results if item.action == "deleted")
context = _build_home_context(
request,
local_path,
message="同步完成,输出已写入测试目录用于验证。",
message_type="success",
sync_result={
"mode": sync_mode.value,
"mode_label": next(
(item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value),
sync_mode.value,
),
"merged_count": merged_count,
"conflict_count": conflict_count,
"delete_count": deleted_count,
"playlist_count": len(results),
"output_dir": TEST_PLAYLIST_DIR,
},
selected_mode=sync_mode.value,
)
return templates.TemplateResponse("home.html", context)
except Exception as exc:
logger.warning(f"Sync failed: {exc}")
context = _build_home_context(
request,
local_path,
message=f"同步失败:{exc}",
message_type="danger",
selected_mode=sync_mode.value,
)
return templates.TemplateResponse("home.html", context)
# 登录页面和处理 # 登录页面和处理
+54
View File
@@ -8,6 +8,60 @@
</div> </div>
</div> </div>
{% if message %}
<div class="alert alert-{{ message_type or 'info' }}" role="alert">
{{ message }}
</div>
{% endif %}
<div class="card mb-4">
<div class="card-body">
<div class="d-flex flex-column flex-xl-row align-items-xl-center gap-3 mb-3">
<div>
<h5 class="card-title mb-0">选择同步策略</h5>
<small class="text-body-secondary">根据需要选择单向覆盖或双向合并策略</small>
</div>
<form class="ms-xl-auto w-100 w-xl-auto" method="post" action="/sync">
<div class="row gy-2 gx-2 align-items-end">
<div class="col-12 col-lg-8">
<label class="form-label mb-1">同步模式</label>
<select class="form-select" name="mode">
{% for option in sync_modes %}
<option value="{{ option.value }}" {% if selected_mode == option.value %}selected{% endif %}>
{{ option.label }}
</option>
{% endfor %}
</select>
</div>
<div class="col-auto">
<label class="form-label d-none d-lg-block">&nbsp;</label>
<div>
<button class="btn btn-primary" type="submit">
<i class="bi bi-play-fill"></i>
<span class="ms-1">开始同步</span>
</button>
</div>
</div>
<input type="hidden" name="local_path" value="{{ local_path }}">
</div>
</form>
</div>
<ul class="mb-0 text-body-secondary small ps-3">
<li><strong>完全本地优先</strong>:本地列表直接覆盖云端,顺序以本地为准(无 diff)。</li>
<li><strong>完全云端优先</strong>:云端列表直接覆盖本地,顺序以云端为准(无 diff)。</li>
<li><strong>双向合并(本地优先)</strong>:三方合并,冲突时选本地。</li>
<li><strong>双向合并(云端优先)</strong>:三方合并,冲突时选云端。</li>
</ul>
{% if sync_result %}
<div class="alert alert-info mt-3 mb-0" role="alert">
<div class="fw-semibold">{{ sync_result.mode_label }}</div>
<div class="small mb-1">播放列表:{{ sync_result.playlist_count }},删除:{{ sync_result.delete_count }},合并曲目:{{ sync_result.merged_count }},冲突数:{{ sync_result.conflict_count }}</div>
<div class="small text-body-secondary">已写入:{{ sync_result.output_dir }}</div>
</div>
{% endif %}
</div>
</div>
<div class="row g-4"> <div class="row g-4">
<div class="col-12 col-xl-6"> <div class="col-12 col-xl-6">
<div class="card h-100"> <div class="card h-100">
View File
+556
View File
@@ -0,0 +1,556 @@
import os
from collections import Counter
from dataclasses import dataclass
from enum import Enum
from typing import Literal, Sequence
from app.utils.config import server_config
from app.utils.logger import logger
from app.utils.plex_client import plex_client
from merge3 import Merge3
TEST_PLAYLIST_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "test_playlists")
)
class ConflictResolutionStrategy(str, Enum):
LOCAL_PRIORITY = "local_priority"
REMOTE_PRIORITY = "remote_priority"
DELETE_PRIORITY = "delete_priority"
KEEP_PRIORITY = "keep_priority"
class SyncMode(str, Enum):
LOCAL_FORCE = "local_force"
REMOTE_FORCE = "remote_force"
MERGE_LOCAL_PRIMARY = "merge_local_primary"
MERGE_REMOTE_PRIMARY = "merge_remote_primary"
@dataclass
class PlaylistSyncResult:
name: str
merged_paths: list[str]
conflicts: list[dict]
action: Literal["synced", "deleted"]
output_dir: str
def load_paths(text: str) -> list[str]:
"""Normalize playlist text into a list of absolute paths.
* Remove blank lines.
* Ignore comment or metadata lines starting with ``#``.
* Preserve order and allow duplicates.
"""
output: list[str] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
output.append(line)
return output
def playlist_to_text(paths: Sequence[str]) -> str:
"""Convert list of paths into diff3-friendly text."""
return "\n".join(paths) + "\n"
def save_paths(paths: Sequence[str]) -> str:
"""Format playlist paths into m3u/m3u8 text."""
return "#EXTM3U\n" + "\n".join(paths) + "\n"
@dataclass
class MergeChunk:
type: Literal["normal", "conflict"]
text: str | None = None
base_text: str | None = None
local_text: str | None = None
remote_text: str | None = None
origin: str | None = None
@dataclass
class MergeResult:
merged_paths: list[str]
conflicts: list[dict]
def _ensure_test_dir(folder: str = TEST_PLAYLIST_DIR) -> str:
os.makedirs(folder, exist_ok=True)
return folder
def _read_text_if_exists(path: str) -> tuple[str, bool]:
try:
with open(path, "r", encoding="utf-8") as file:
return file.read(), True
except FileNotFoundError:
return "", False
except OSError:
return "", False
def _save_playlist_to_folder(filename: str, paths: Sequence[str], folder: str) -> str:
_ensure_test_dir(folder)
file_path = os.path.join(folder, filename)
with open(file_path, "w", encoding="utf-8") as file:
file.write(save_paths(paths))
return file_path
def _normalize_inputs(
base_text: str, local_text: str, remote_text: str, folder: str
) -> tuple[list[str], list[str], list[str]]:
"""Normalize playlist inputs and persist snapshots for inspection."""
base_paths = load_paths(base_text)
local_paths = load_paths(local_text)
remote_paths = load_paths(remote_text)
_save_playlist_to_folder("base_playlist.m3u8", base_paths, folder)
_save_playlist_to_folder("local_input.m3u8", local_paths, folder)
_save_playlist_to_folder("remote_input.m3u8", remote_paths, folder)
return base_paths, local_paths, remote_paths
def _merge_chunks(
base_paths: Sequence[str], local_paths: Sequence[str], remote_paths: Sequence[str]
) -> list[MergeChunk]:
merger = Merge3(base_paths, local_paths, remote_paths)
raw_chunks = list(merger.merge_groups())
chunks: list[MergeChunk] = []
for kind, *payload in raw_chunks:
if kind == "conflict":
base_lines, local_lines, remote_lines = payload # type: ignore[misc]
# Absorb trailing side-only chunks that actually belong to this
# conflicting region (common in reorder scenarios).
while chunks and chunks[-1].origin in ("a", "b"):
previous = chunks.pop()
previous_lines = previous.text.splitlines() if previous.text else []
if previous.origin == "a":
local_lines = previous_lines + local_lines
else:
remote_lines = previous_lines + remote_lines
chunks.append(
MergeChunk(
type="conflict",
base_text=playlist_to_text(base_lines),
local_text=playlist_to_text(local_lines),
remote_text=playlist_to_text(remote_lines),
)
)
else:
lines = payload[0]
if not lines:
continue
chunk = MergeChunk(type="normal", text=playlist_to_text(lines), origin=kind)
chunks.append(chunk)
return chunks
def _write_results(merged_lines: Sequence[str], folder: str) -> None:
_save_playlist_to_folder("local_result.m3u8", merged_lines, folder)
_save_playlist_to_folder("remote_result.m3u8", merged_lines, folder)
_save_playlist_to_folder("base_next.m3u8", merged_lines, folder)
def _write_delete_marker(playlist: str, folder: str) -> str:
_ensure_test_dir(folder)
marker_path = os.path.join(folder, "delete.txt")
with open(marker_path, "w", encoding="utf-8") as file:
file.write(f"delete playlist {playlist}")
return marker_path
def _resolve_conflict(conflict: dict, strategy: ConflictResolutionStrategy) -> list[str]:
local_lines = conflict["local"]
remote_lines = conflict["remote"]
if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY:
return local_lines
if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY:
return remote_lines
if strategy == ConflictResolutionStrategy.DELETE_PRIORITY:
if not local_lines or not remote_lines:
return []
return local_lines
if strategy == ConflictResolutionStrategy.KEEP_PRIORITY:
if local_lines:
return local_lines
if remote_lines:
return remote_lines
return []
raise ValueError(f"Unsupported conflict resolution strategy: {strategy}")
def _postprocess_local_priority_merge(
merged_lines: list[str],
base_paths: Sequence[str],
local_paths: Sequence[str],
remote_paths: Sequence[str],
) -> list[str]:
"""Fix local-priority edge cases without disturbing existing order."""
base_counter = Counter(base_paths)
local_counter = Counter(local_paths)
remote_counter = Counter(remote_paths)
merged_counter = Counter(merged_lines)
merged = list(merged_lines)
# 1) Preserve tracks that only Remote added (not in Base or Local).
for track, remote_count in remote_counter.items():
if base_counter[track] == 0 and local_counter[track] == 0 and remote_count > 0:
current = merged_counter.get(track, 0)
while current < remote_count:
merged.append(track)
merged_counter[track] += 1
current += 1
# 2) For tracks only new in Local+Remote (not in Base), keep Local's count.
candidate_tracks = set(local_counter.keys()) | set(remote_counter.keys())
for track in candidate_tracks:
if base_counter[track] == 0 and local_counter[track] > 0 and remote_counter[track] > 0:
target = local_counter[track]
current = merged_counter.get(track, 0)
if current > target:
remove_needed = current - target
trimmed: list[str] = []
for path in reversed(merged):
if remove_needed and path == track:
remove_needed -= 1
continue
trimmed.append(path)
trimmed.reverse()
merged = trimmed
merged_counter = Counter(merged)
current = merged_counter.get(track, 0)
while current < target:
merged.append(track)
current += 1
merged_counter[track] = current
return merged
def _postprocess_remote_priority_merge(
merged_lines: list[str],
base_paths: Sequence[str],
local_paths: Sequence[str],
remote_paths: Sequence[str],
) -> list[str]:
base_counter = Counter(base_paths)
local_counter = Counter(local_paths)
remote_counter = Counter(remote_paths)
remote_base_tracks: list[str] = [
track for track in remote_paths if base_counter[track] > 0
]
target_new_counter: dict[str, int] = {}
all_tracks = set(local_counter.keys()) | set(remote_counter.keys())
for track in all_tracks:
if base_counter[track] > 0:
continue
r_cnt = remote_counter.get(track, 0)
l_cnt = local_counter.get(track, 0)
if r_cnt > 0:
target_new_counter[track] = r_cnt
elif l_cnt > 0:
target_new_counter[track] = l_cnt
new_tracks: list[str] = []
used_counter: Counter[str] = Counter()
for track in remote_paths:
if base_counter[track] > 0:
continue
if track not in target_new_counter:
continue
if used_counter[track] >= target_new_counter[track]:
continue
new_tracks.append(track)
used_counter[track] += 1
for track in local_paths:
if base_counter[track] > 0:
continue
if track not in target_new_counter:
continue
if used_counter[track] >= target_new_counter[track]:
continue
new_tracks.append(track)
used_counter[track] += 1
return remote_base_tracks + new_tracks
def merge_playlists(
base_text: str,
local_text: str,
remote_text: str,
strategy: ConflictResolutionStrategy = ConflictResolutionStrategy.LOCAL_PRIORITY,
test_folder: str = TEST_PLAYLIST_DIR,
) -> MergeResult:
"""Merge playlists using diff3 and resolve conflicts per strategy.
The base, local, and remote normalized playlists are saved into ``test_folder``
for inspection. The merged playlist is also stored twice to simulate the
versions intended for local save and cloud upload.
"""
base_paths, local_paths, remote_paths = _normalize_inputs(
base_text, local_text, remote_text, test_folder
)
chunks = _merge_chunks(base_paths, local_paths, remote_paths)
has_conflict = any(chunk.type == "conflict" for chunk in chunks)
merged_lines: list[str] = []
conflicts: list[dict] = []
for chunk in chunks:
if chunk.type == "normal":
merged_lines.extend(chunk.text.splitlines())
else:
conflict_info = {
"base": chunk.base_text.splitlines(),
"local": chunk.local_text.splitlines(),
"remote": chunk.remote_text.splitlines(),
}
conflicts.append(conflict_info)
resolved = _resolve_conflict(conflict_info, strategy)
merged_lines.extend(resolved)
if strategy == ConflictResolutionStrategy.LOCAL_PRIORITY:
merged_lines = _postprocess_local_priority_merge(
merged_lines, base_paths, local_paths, remote_paths
)
if strategy == ConflictResolutionStrategy.REMOTE_PRIORITY:
merged_lines = _postprocess_remote_priority_merge(
merged_lines, base_paths, local_paths, remote_paths
)
_write_results(merged_lines, test_folder)
return MergeResult(merged_paths=merged_lines, conflicts=conflicts)
def _load_local_playlists(local_dir: str) -> dict[str, str]:
"""Read all local playlists under ``local_dir`` keyed by stem name."""
playlists: dict[str, str] = {}
if not local_dir:
return playlists
absolute = os.path.abspath(local_dir)
if not os.path.isdir(absolute):
return playlists
for entry in os.scandir(absolute):
if entry.is_file() and entry.name.lower().endswith((".m3u", ".m3u8")):
stem, _ = os.path.splitext(entry.name)
try:
with open(entry.path, "r", encoding="utf-8") as file:
playlists[stem] = file.read()
except OSError:
playlists[stem] = ""
return playlists
def _load_playlist_snapshots(playlist: str, folder: str) -> tuple[str, str, str, bool, bool]:
"""Load base/local/remote texts for a playlist from its test folder."""
playlist_folder = os.path.join(folder, playlist)
base_text, base_exists = _read_text_if_exists(
os.path.join(playlist_folder, "base_next.m3u8")
)
if not base_text:
alt_text, _ = _read_text_if_exists(
os.path.join(playlist_folder, "base_playlist.m3u8")
)
base_text = base_text or alt_text
remote_text, remote_exists = _read_text_if_exists(
os.path.join(playlist_folder, "remote_input.m3u8")
)
return base_text, remote_text, playlist_folder, remote_exists, base_exists
def _fetch_remote_playlists() -> dict[str, str]:
"""Retrieve remote playlists from Plex when connected.
Returns a mapping of playlist name to serialized playlist text. Failures to
connect or fetch playlists are logged and result in an empty mapping to
avoid blocking local testing.
"""
server_config.load()
if not server_config.url:
return {}
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
)
except Exception as exc: # pragma: no cover - network access
logger.warning(f"Failed to connect to Plex for remote playlists: {exc}")
return {}
playlists: dict[str, str] = {}
try:
for playlist in plex_client.server.playlists():
paths: list[str] = []
try:
for track in playlist.items():
locations = getattr(track, "locations", None) or []
if locations:
paths.append(locations[0])
except Exception as exc: # pragma: no cover - plex runtime
logger.warning(f"Failed to read playlist '{playlist.title}': {exc}")
playlists[playlist.title] = save_paths(paths)
except Exception as exc: # pragma: no cover - plex runtime
logger.warning(f"Failed to enumerate remote playlists: {exc}")
return {}
return playlists
def _sync_single_playlist(
playlist: str,
mode: SyncMode,
local_text: str | None,
base_text: str,
remote_text: str,
playlist_folder: str,
remote_present: bool,
) -> PlaylistSyncResult:
local_present = local_text is not None
local_text = local_text or ""
if not base_text:
base_text = "#EXTM3U\n"
if not remote_text:
remote_text = "#EXTM3U\n"
if mode == SyncMode.LOCAL_FORCE:
if not local_present:
_write_delete_marker(playlist, playlist_folder)
return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder)
base_paths, local_paths, _ = _normalize_inputs(
base_text, local_text, remote_text, playlist_folder
)
merged_lines = list(local_paths)
_write_results(merged_lines, playlist_folder)
return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder)
if mode == SyncMode.REMOTE_FORCE:
if not remote_present:
_write_delete_marker(playlist, playlist_folder)
return PlaylistSyncResult(playlist, [], [], "deleted", playlist_folder)
base_paths, _, remote_paths = _normalize_inputs(
base_text, local_text, remote_text, playlist_folder
)
merged_lines = list(remote_paths)
_write_results(merged_lines, playlist_folder)
return PlaylistSyncResult(playlist, merged_lines, [], "synced", playlist_folder)
if mode not in (SyncMode.MERGE_LOCAL_PRIMARY, SyncMode.MERGE_REMOTE_PRIMARY):
raise ValueError(f"Unsupported sync mode: {mode}")
merge_strategy = (
ConflictResolutionStrategy.LOCAL_PRIORITY
if mode == SyncMode.MERGE_LOCAL_PRIMARY
else ConflictResolutionStrategy.REMOTE_PRIORITY
)
merge_result = merge_playlists(
base_text=base_text,
local_text=local_text,
remote_text=remote_text,
strategy=merge_strategy,
test_folder=playlist_folder,
)
if not merge_result.merged_paths and (not local_present or not remote_present):
_write_delete_marker(playlist, playlist_folder)
return PlaylistSyncResult(
playlist, merge_result.merged_paths, merge_result.conflicts, "deleted", playlist_folder
)
return PlaylistSyncResult(
playlist, merge_result.merged_paths, merge_result.conflicts, "synced", playlist_folder
)
def sync_all_playlists(
local_dir: str, mode: SyncMode, test_folder: str = TEST_PLAYLIST_DIR
) -> list[PlaylistSyncResult]:
"""Synchronize all playlists that can be matched by name."""
_ensure_test_dir(test_folder)
local_playlists = _load_local_playlists(local_dir)
remote_playlists = _fetch_remote_playlists()
playlist_names: set[str] = set(local_playlists.keys())
playlist_names.update(remote_playlists.keys())
for entry in os.scandir(test_folder):
if entry.is_dir():
playlist_names.add(entry.name)
results: list[PlaylistSyncResult] = []
for playlist in sorted(playlist_names):
base_text, snapshot_remote_text, playlist_folder, remote_exists, _ = _load_playlist_snapshots(
playlist, test_folder
)
local_text = local_playlists.get(playlist)
remote_text = remote_playlists.get(playlist)
remote_present = False
if remote_text:
remote_present = True
else:
remote_text = snapshot_remote_text
remote_present = bool(remote_text.strip()) or remote_exists
# Treat missing remote text as absent playlist.
result = _sync_single_playlist(
playlist=playlist,
mode=mode,
local_text=local_text,
base_text=base_text,
remote_text=remote_text,
playlist_folder=playlist_folder,
remote_present=remote_present,
)
results.append(result)
return results
+1
View File
@@ -3,3 +3,4 @@ uvicorn[standard]
jinja2 jinja2
python-multipart python-multipart
plexapi plexapi
merge3