Files
PlexPlaylistSync/app/main.py
T

720 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from datetime import datetime
from typing import Sequence
from fastapi import FastAPI, Form, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
from app.utils.config import server_config
from app.utils.local_playlist import load_local_playlist, scan_local_playlists
from app.utils.logger import logger
from app.utils.playlist_merge import SyncMode, TEST_PLAYLIST_DIR, sync_all_playlists
from app.utils.plex_client import plex_client
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
FRONTEND_DIST_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "frontend", "dist")
)
# mount static files
# 这里的路径是相对于 main.py 文件所在的目录
app.mount(
"/static",
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
name="static",
)
if os.path.isdir(os.path.join(FRONTEND_DIST_PATH, "assets")):
app.mount(
"/assets",
StaticFiles(directory=os.path.join(FRONTEND_DIST_PATH, "assets")),
name="frontend-assets",
)
SYNC_MODE_OPTIONS = [
{
"value": SyncMode.LOCAL_FORCE.value,
"label": "完全本地优先(local_force",
"description": "单向同步,本地覆盖云端且顺序以本地为准。",
},
{
"value": SyncMode.REMOTE_FORCE.value,
"label": "完全云端优先(remote_force",
"description": "单向同步,云端覆盖本地且顺序以云端为准。",
},
{
"value": SyncMode.MERGE_LOCAL_PRIMARY.value,
"label": "双向合并(本地优先)",
"description": "三方合并,冲突时选择本地版本。",
},
{
"value": SyncMode.MERGE_REMOTE_PRIMARY.value,
"label": "双向合并(云端优先)",
"description": "三方合并,冲突时选择云端版本。",
},
]
class PlaylistItem(BaseModel):
id: str
title: str
trackCount: int = Field(..., ge=0)
lastUpdated: str | None = None
class RegexRule(BaseModel):
pattern: str
replacement: str = ""
class SyncSettingsResponse(BaseModel):
sync_mode: str
path_rules: list[RegexRule]
local_path: str
library_name: str | None = None
server_url: str | None = None
scheme: str | None = None
port: str | None = None
timeout: int | None = None
token: str | None = None
class ConnectRequest(BaseModel):
protocol: str = Field("https", pattern="https?", description="HTTP or HTTPS")
address: str
port: str = "32400"
token: str = ""
username: str | None = None
password: str | None = None
timeout: int | None = None
library_name: str | None = None
class ConnectResponse(BaseModel):
token: str
serverInfo: dict
def _get_cloud_playlists() -> tuple[list[dict], str, dict, str, list[str]]:
"""Fetch playlists and connection state from the remote Plex server."""
server_config.load()
playlists: list[dict] = []
status = "unset"
server_info = {
"name": "未设置",
"domain": server_config.url or "未设置",
}
selected_library = server_config.library_name
music_libraries: list[str] = []
# no server url configured
if not server_config.url:
return playlists, status, server_info, selected_library, music_libraries
status = "failed"
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
timeout=server_config.timeout,
)
status = "connected" if plex_client.connected else "failed"
server_info.update(
{
"name": getattr(plex_client.server, "friendlyName", "未命名服务器"),
"domain": server_config.url,
}
)
music_libraries = plex_client.get_libs_name_list()
if not music_libraries:
server_config.set_and_save_config(library_name="")
return playlists, status, server_info, "", music_libraries
if not selected_library or selected_library not in music_libraries:
selected_library = music_libraries[0]
server_config.set_and_save_config(library_name=selected_library)
for playlist in plex_client.get_lib_playlists(selected_library) or []:
track_count = getattr(playlist, "itemCount", None)
if track_count is None:
try:
track_count = len(playlist.items())
except Exception:
track_count = 0
playlists.append(
{
"name": playlist.title,
"track_count": track_count,
}
)
except Exception as exc:
logger.warning(f"Failed to fetch cloud playlists: {exc}")
status = "failed"
playlists.sort(key=lambda item: item["name"].lower())
return playlists, status, server_info, selected_library, music_libraries
def _library_dicts(names: Sequence[str]) -> list[dict]:
return [{"id": name, "title": name, "type": "artist"} for name in names]
def _playlist_item(name: str, track_count: int, prefix: str, last_updated: float | None = None) -> PlaylistItem:
updated_value = (
datetime.utcfromtimestamp(last_updated).isoformat() + "Z"
if last_updated
else datetime.utcnow().isoformat() + "Z"
)
return PlaylistItem(
id=f"{prefix}-{name}",
title=name,
trackCount=track_count,
lastUpdated=updated_value,
)
def _scan_local_playlists_with_meta(local_path: str) -> list[PlaylistItem]:
items: list[PlaylistItem] = []
base_path = local_path or server_config.local_path
if not base_path:
return items
absolute_path = os.path.abspath(base_path)
if not os.path.isdir(absolute_path):
logger.warning(f"Playlist path does not exist or is not a directory: {absolute_path}")
return items
for entry in os.scandir(absolute_path):
if not entry.is_file():
continue
if not entry.name.lower().endswith((".m3u", ".m3u8")):
continue
tracks = load_local_playlist(entry.path)
stat_info = entry.stat()
items.append(
_playlist_item(
name=entry.name,
track_count=len(tracks),
prefix="local",
last_updated=stat_info.st_mtime,
)
)
items.sort(key=lambda playlist: playlist.title.lower())
return items
def _get_server_status() -> tuple[dict, str, list[dict]]:
"""Return server connection status and available libraries."""
server_config.load()
if not server_config.url:
return {"isConnected": False}, "unset", []
connection_status = "failed"
libraries: list[dict] = []
server_info = {
"isConnected": False,
"name": "未设置",
"ip": server_config.url,
"port": server_config.port,
"libraryName": server_config.library_name,
}
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
timeout=server_config.timeout,
)
connection_status = "connected" if plex_client.connected else "failed"
server_info.update(
{
"isConnected": plex_client.connected,
"name": getattr(plex_client.server, "friendlyName", "未命名服务器"),
"ip": server_config.url,
"port": server_config.port,
}
)
if plex_client.connected:
lib_names = plex_client.get_libs_name_list()
libraries = _library_dicts(lib_names)
if lib_names:
selected_library = server_config.library_name or lib_names[0]
if selected_library not in lib_names:
selected_library = lib_names[0]
server_config.set_and_save_config(library_name=selected_library)
server_info["libraryName"] = selected_library
except Exception as exc:
logger.warning(f"Failed to connect to Plex server: {exc}")
return server_info, connection_status, libraries
class SyncModePayload(BaseModel):
mode: SyncMode
class RegexRulePayload(BaseModel):
rules: list[RegexRule]
class LibrarySelection(BaseModel):
library_name: str
class SyncRequest(BaseModel):
mode: SyncMode | None = None
local_path: str | None = None
@app.get("/api/settings", response_model=SyncSettingsResponse)
async def get_settings():
server_config.load()
rules = [
RegexRule(pattern=rule.get("pattern", ""), replacement=rule.get("replacement", ""))
for rule in server_config.path_rules
]
return SyncSettingsResponse(
sync_mode=server_config.sync_mode,
path_rules=rules,
local_path=server_config.local_path,
library_name=server_config.library_name,
server_url=server_config.url,
scheme=server_config.scheme,
port=server_config.port,
timeout=server_config.timeout,
token=server_config.token,
)
@app.put("/api/settings/sync-mode")
async def update_sync_mode(payload: SyncModePayload):
server_config.set_and_save_config(sync_mode=payload.mode.value)
return {"sync_mode": payload.mode.value}
@app.get("/api/settings/regex-rules")
async def get_regex_rules():
server_config.load()
return {"rules": server_config.path_rules}
@app.put("/api/settings/regex-rules")
async def update_regex_rules(payload: RegexRulePayload):
server_config.set_and_save_config(path_rules=[rule.model_dump() for rule in payload.rules])
return {"rules": payload.rules}
@app.put("/api/settings/library")
async def update_library(payload: LibrarySelection):
server_config.set_and_save_config(library_name=payload.library_name)
return {"library_name": server_config.library_name}
@app.get("/api/server")
async def api_server_status():
server_info, status, libraries = _get_server_status()
return {"status": status, "serverInfo": server_info, "libraries": libraries}
@app.post("/api/connect", response_model=ConnectResponse)
async def api_connect(payload: ConnectRequest):
try:
_, token = plex_client.connect(
username=payload.username or "",
password=payload.password or "",
token=payload.token or "",
scheme=payload.protocol,
url=payload.address,
port=payload.port,
timeout=payload.timeout,
)
libraries = []
selected_library = payload.library_name or server_config.library_name
if plex_client.connected:
lib_names = plex_client.get_libs_name_list()
libraries = _library_dicts(lib_names)
if lib_names:
if not selected_library or selected_library not in lib_names:
selected_library = lib_names[0]
server_config.set_and_save_config(
token=token,
scheme=payload.protocol,
url=payload.address,
port=payload.port,
timeout=payload.timeout,
library_name=selected_library or "",
)
server_info = {
"isConnected": plex_client.connected,
"name": getattr(plex_client.server, "friendlyName", "未命名服务器") if plex_client.connected else "未命名服务器",
"ip": payload.address,
"port": payload.port,
"libraryName": selected_library or "",
"libraries": libraries,
}
return ConnectResponse(token=token, serverInfo=server_info)
except Exception as exc:
logger.warning(f"Failed to connect via API: {exc}")
raise HTTPException(status_code=400, detail=str(exc))
@app.get("/api/playlists")
async def api_playlists(server: str = Query(..., pattern="(?i)^(local|cloud)$"), local_path: str | None = None):
server_type = server.lower()
if server_type == "local":
resolved_path = local_path or server_config.local_path
server_config.set_and_save_config(local_path=resolved_path)
playlists = _scan_local_playlists_with_meta(resolved_path)
return {"playlists": [item.model_dump() for item in playlists]}
playlists, connection_status, server_info, selected_library, libraries = _get_cloud_playlists()
items = [_playlist_item(item["name"], item.get("track_count", 0), "cloud") for item in playlists]
return {
"playlists": [item.model_dump() for item in items],
"connection_status": connection_status,
"server_info": server_info,
"library": selected_library,
"libraries": _library_dicts(libraries),
}
@app.post("/api/sync")
async def api_sync(payload: SyncRequest):
server_config.load()
try:
sync_mode = payload.mode or SyncMode(server_config.sync_mode)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
local_dir = payload.local_path or server_config.local_path
results = sync_all_playlists(local_dir=local_dir, mode=sync_mode, test_folder=TEST_PLAYLIST_DIR)
merged_count = sum(len(item.merged_paths) for item in results)
conflict_count = sum(len(item.conflicts) for item in results)
deleted_count = sum(1 for item in results if item.action == "deleted")
return {
"mode": sync_mode.value,
"merged_count": merged_count,
"conflict_count": conflict_count,
"delete_count": deleted_count,
"playlist_count": len(results),
"output_dir": TEST_PLAYLIST_DIR,
}
def _build_home_context(
request: Request,
local_path: str,
message: str | None = None,
message_type: str | None = None,
sync_result: dict | None = None,
selected_mode: str | None = None,
):
server_config.load()
local_playlists = scan_local_playlists(local_path)
(
cloud_playlists,
connection_status,
server_info,
selected_library,
music_libraries,
) = _get_cloud_playlists()
return {
"request": request,
"theme": server_config.theme,
"path": "/",
"local_playlists": local_playlists,
"local_path": local_path,
"cloud_playlists": cloud_playlists,
"connection_status": connection_status,
"server_info": server_info,
"selected_library": selected_library,
"music_libraries": music_libraries,
"sync_modes": SYNC_MODE_OPTIONS,
"selected_mode": selected_mode or server_config.sync_mode,
"message": message,
"message_type": message_type,
"sync_result": sync_result,
"path_rules": server_config.path_rules,
}
# 显示主页
@app.get("/", response_class=HTMLResponse)
async def home(request: Request, local_path: str = "playlist"):
index_path = os.path.join(FRONTEND_DIST_PATH, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
context = _build_home_context(request, local_path or server_config.local_path)
return templates.TemplateResponse("home.html", context)
@app.post("/sync", response_class=HTMLResponse)
async def trigger_sync(request: Request, mode: str = Form(...), local_path: str = Form("playlist")):
try:
sync_mode = SyncMode(mode)
except ValueError:
context = _build_home_context(
request,
local_path,
message=f"未知的同步策略:{mode}",
message_type="danger",
selected_mode=mode,
)
return templates.TemplateResponse("home.html", context)
try:
results = sync_all_playlists(
local_dir=local_path,
mode=sync_mode,
test_folder=TEST_PLAYLIST_DIR,
)
merged_count = sum(len(item.merged_paths) for item in results)
conflict_count = sum(len(item.conflicts) for item in results)
deleted_count = sum(1 for item in results if item.action == "deleted")
context = _build_home_context(
request,
local_path,
message="同步完成,输出已写入测试目录用于验证。",
message_type="success",
sync_result={
"mode": sync_mode.value,
"mode_label": next(
(item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value),
sync_mode.value,
),
"merged_count": merged_count,
"conflict_count": conflict_count,
"delete_count": deleted_count,
"playlist_count": len(results),
"output_dir": TEST_PLAYLIST_DIR,
},
selected_mode=sync_mode.value,
)
return templates.TemplateResponse("home.html", context)
except Exception as exc:
logger.warning(f"Sync failed: {exc}")
context = _build_home_context(
request,
local_path,
message=f"同步失败:{exc}",
message_type="danger",
selected_mode=sync_mode.value,
)
return templates.TemplateResponse("home.html", context)
@app.post("/path-rules", response_class=HTMLResponse)
async def save_path_rules(
request: Request,
local_path: str = Form("playlist"),
pattern: list[str] | None = Form(None),
replacement: list[str] | None = Form(None),
):
patterns = pattern or []
replacements = replacement or []
cleaned_rules: list[dict[str, str]] = []
for pat, repl in zip(patterns, replacements):
pat = pat.strip()
if not pat:
continue
cleaned_rules.append({"pattern": pat, "replacement": repl or ""})
server_config.set_and_save_config(path_rules=cleaned_rules)
context = _build_home_context(
request,
local_path,
message="正则规则已保存并会在同步前应用。",
message_type="success",
)
return templates.TemplateResponse("home.html", context)
# 登录页面和处理
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
server_config.load()
music_libraries = []
selected_library = server_config.library_name
if server_config.url and server_config.token:
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
timeout=server_config.timeout,
)
music_libraries = plex_client.get_libs_name_list()
if music_libraries:
if selected_library not in music_libraries:
selected_library = music_libraries[0]
else:
selected_library = ""
except Exception:
selected_library = ""
music_libraries = []
return templates.TemplateResponse(
"login.html",
{
"request": request,
"theme": server_config.theme,
"path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
"music_libraries": music_libraries,
"selected_library": selected_library,
},
)
@app.post("/login", response_class=HTMLResponse)
async def login(
request: Request,
user: str = Form(...),
pw: str = Form(...),
token: str = Form(...),
scheme: str = Form("https"),
url: str = Form(...),
port: str = Form("32400"),
library_name: str = Form(""),
):
# 尝试连接到 Plex 服务器
try:
# 优先使用 token 连接,如果 token 为空则使用用户名和密码连接
_, token_success = plex_client.connect(
username=user,
password=pw,
token=token,
scheme=scheme,
url=url,
port=port,
timeout=server_config.timeout,
)
# 成功连接后保存配置到配置文件
music_libraries: list[str] = []
selected_library = ""
if plex_client.connected:
try:
music_libraries = plex_client.get_libs_name_list()
except Exception as exc:
logger.warning(f"Unable to fetch music libraries: {exc}")
music_libraries = []
if music_libraries:
if library_name and library_name in music_libraries:
selected_library = library_name
else:
selected_library = music_libraries[0]
server_config.set_and_save_config(
token=token_success,
scheme=scheme,
url=url,
port=port,
timeout=server_config.timeout,
library_name=selected_library,
)
else:
music_libraries = []
server_config.set_and_save_config(
token=token_success,
scheme=scheme,
url=url,
port=port,
timeout=server_config.timeout,
library_name="",
)
return templates.TemplateResponse(
"login.html",
{
"request": request,
"message": "连接成功",
"message_type": "success",
"theme": server_config.theme,
"path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
"music_libraries": music_libraries,
"selected_library": selected_library,
},
)
except Exception as e:
music_libraries = []
return templates.TemplateResponse(
"login.html",
{
"request": request,
"message": f"连接失败:{str(e)}",
"message_type": "danger",
"theme": server_config.theme,
"path": "/login",
"scheme": scheme,
"server_url": url,
"port": port,
"music_libraries": music_libraries,
"selected_library": "",
},
)
@app.get("/playlist", response_class=HTMLResponse)
async def get_playlist(request: Request):
return templates.TemplateResponse(
"playlist.html",
{"request": request, "theme": server_config.theme, "path": "/playlist"},
)
@app.post("/playlist", response_class=HTMLResponse)
async def set_playlist(
request: Request, address: str = Form(...), interval: str = Form(...)
):
# demo:返回提交的设置
return templates.TemplateResponse(
"playlist.html",
{
"request": request,
"message": f"设置成功:地址 {address},间隔 {interval} 分钟",
"message_type": "info",
"theme": server_config.theme,
"path": "/playlist",
},
)
@app.post("/set-theme")
async def set_theme(theme: str = Form(...)):
server_config.set_and_save_config(theme=theme)
return RedirectResponse("/login", status_code=303)