import os from pathlib import Path from typing import Literal from fastapi import FastAPI, Form, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, ConfigDict, Field from app.utils.config import server_config from app.utils.logger import logger from app.utils.local_playlist import scan_local_playlists from app.utils.playlist_merge import SyncMode, TEST_PLAYLIST_DIR, sync_all_playlists from app.utils.plex_client import MUSIC_LIBRARY_TYPE, plex_client BASE_DIR = Path(__file__).resolve().parent FRONTEND_DIST = BASE_DIR / "static" / "frontend" FRONTEND_INDEX = FRONTEND_DIST / "index.html" DEFAULT_STATUS_INTERVAL = max( 15, int(os.getenv("STATUS_CHECK_INTERVAL_SECONDS", "45")) ) app = FastAPI() templates = Jinja2Templates(directory=BASE_DIR / "templates") # mount static files app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") if (FRONTEND_DIST / "assets").exists(): app.mount( "/assets", StaticFiles(directory=FRONTEND_DIST / "assets"), name="assets", ) SYNC_MODE_OPTIONS = [ { "value": SyncMode.LOCAL_FORCE.value, "label": "完全本地优先(local_force)", "description": "单向同步,本地覆盖云端且顺序以本地为准。", }, { "value": SyncMode.REMOTE_FORCE.value, "label": "完全云端优先(remote_force)", "description": "单向同步,云端覆盖本地且顺序以云端为准。", }, { "value": SyncMode.MERGE_LOCAL_PRIMARY.value, "label": "双向合并(本地优先)", "description": "三方合并,冲突时选择本地版本。", }, { "value": SyncMode.MERGE_REMOTE_PRIMARY.value, "label": "双向合并(云端优先)", "description": "三方合并,冲突时选择云端版本。", }, ] class PlexConnectionSettings(BaseModel): protocol: Literal["http", "https"] = Field("https", description="Connection protocol") address: str = Field(..., description="Plex server domain or IP") port: str = Field("32400", description="Plex server port") token: str = "" username: str | None = "" password: str | None = "" library: str | None = None class RegexRule(BaseModel): pattern: str replacement: str = "" model_config = ConfigDict(extra="ignore") class StrategyPayload(BaseModel): strategy: str class LibrarySelection(BaseModel): library: str def _format_regex_rules(rules: list[dict]) -> list[dict]: formatted: list[dict] = [] for idx, rule in enumerate(rules or []): pattern = (rule.get("pattern") or "").strip() replacement = rule.get("replacement") or "" if not pattern: continue formatted.append( { "id": f"rule-{idx}", "pattern": pattern, "replacement": replacement, } ) return formatted def _list_music_libraries() -> list[dict]: libraries: list[dict] = [] if not plex_client.connected or not plex_client.server: return libraries try: for lib in plex_client.server.library.sections(): if getattr(lib, "type", None) != MUSIC_LIBRARY_TYPE: continue libraries.append( { "id": getattr(lib, "uuid", getattr(lib, "key", lib.title)), "title": lib.title, "type": getattr(lib, "type", ""), } ) except Exception as exc: logger.warning(f"Unable to fetch music library metadata: {exc}") return libraries def _get_cloud_playlists( include_playlists: bool = True, ) -> tuple[list[dict], str, dict, str, list[str]]: """Fetch playlists and connection state from the remote Plex server.""" server_config.load() playlists: list[dict] = [] status = "unset" server_info = { "name": "未设置", "domain": server_config.url or "未设置", } selected_library = server_config.library_name music_libraries: list[str] = [] # no server url configured if not server_config.url: return playlists, status, server_info, selected_library, music_libraries status = "failed" try: plex_client.connect( token=server_config.token, scheme=server_config.scheme, url=server_config.url, port=server_config.port, ) status = "connected" if plex_client.connected else "failed" server_info.update( { "name": getattr(plex_client.server, "friendlyName", "未命名服务器"), "domain": server_config.url, } ) music_libraries = plex_client.get_libs_name_list() if not music_libraries: server_config.set_and_save_config(library_name="") return playlists, status, server_info, "", music_libraries if not selected_library or selected_library not in music_libraries: selected_library = music_libraries[0] server_config.set_and_save_config(library_name=selected_library) if include_playlists: for playlist in plex_client.get_lib_playlists(selected_library) or []: track_count = getattr(playlist, "itemCount", None) if track_count is None: try: track_count = len(playlist.items()) except Exception: track_count = 0 playlists.append( { "name": playlist.title, "track_count": track_count, "rating_key": getattr(playlist, "ratingKey", playlist.title), "last_modified": getattr( getattr(playlist, "updatedAt", None), "isoformat", lambda: None )(), } ) except Exception as exc: logger.warning(f"Failed to fetch cloud playlists: {exc}") status = "failed" if include_playlists: playlists.sort(key=lambda item: item["name"].lower()) return playlists, status, server_info, selected_library, music_libraries def _build_home_context( request: Request, local_path: str, message: str | None = None, message_type: str | None = None, sync_result: dict | None = None, selected_mode: str | None = None, ): local_path = local_path or server_config.local_playlist_dir server_config.load() local_playlists = scan_local_playlists(local_path) ( cloud_playlists, connection_status, server_info, selected_library, music_libraries, ) = _get_cloud_playlists() return { "request": request, "theme": server_config.theme, "path": "/", "local_playlists": local_playlists, "local_path": local_path, "cloud_playlists": cloud_playlists, "connection_status": connection_status, "server_info": server_info, "selected_library": selected_library, "music_libraries": music_libraries, "sync_modes": SYNC_MODE_OPTIONS, "selected_mode": selected_mode, "message": message, "message_type": message_type, "sync_result": sync_result, "path_rules": server_config.path_rules, } def _format_local_playlists_response(playlists: list[dict]) -> list[dict]: formatted: list[dict] = [] for item in playlists: formatted.append( { "id": f"local::{item.get('name', '')}", "title": item.get("name", ""), "trackCount": item.get("track_count", 0), "lastUpdated": item.get("last_modified"), "path": item.get("path"), } ) return formatted def _format_cloud_playlists_response(playlists: list[dict]) -> list[dict]: formatted: list[dict] = [] for item in playlists: formatted.append( { "id": str(item.get("rating_key") or item.get("name")), "title": item.get("name", ""), "trackCount": item.get("track_count", 0), "lastUpdated": item.get("last_modified"), } ) return formatted # 显示前端(React 构建后的静态文件),若缺失则回退到旧版模板 @app.get("/", response_class=HTMLResponse) async def serve_frontend(request: Request, local_path: str | None = None): if FRONTEND_INDEX.exists(): return FileResponse(FRONTEND_INDEX) context = _build_home_context( request, local_path or server_config.local_playlist_dir, message="前端静态文件未构建,已回退到旧版页面。", message_type="warning", ) return templates.TemplateResponse("home.html", context) @app.get("/api/ui-config") async def get_ui_config(): """Expose UI-related settings for the front-end.""" server_config.load() return { "status": "success", "data": { "statusCheckIntervalSeconds": DEFAULT_STATUS_INTERVAL, "localPlaylistDir": server_config.local_playlist_dir, }, } @app.get("/api/settings") async def get_settings(): server_config.load() return { "status": "success", "data": { "syncStrategy": server_config.sync_strategy, "regexRules": _format_regex_rules(server_config.path_rules), }, } @app.post("/api/settings/strategy") async def save_strategy(payload: StrategyPayload): server_config.set_and_save_config(sync_strategy=payload.strategy) return {"status": "success", "data": {"syncStrategy": payload.strategy}} @app.get("/api/regex-rules") async def get_regex_rules(): server_config.load() return {"status": "success", "data": _format_regex_rules(server_config.path_rules)} @app.post("/api/regex-rules") async def save_regex_rules(rules: list[RegexRule]): cleaned = [ {"pattern": rule.pattern.strip(), "replacement": rule.replacement or ""} for rule in rules if rule.pattern.strip() ] server_config.set_and_save_config(path_rules=cleaned) return {"status": "success", "data": _format_regex_rules(cleaned)} @app.get("/api/playlists/{server_type}") async def api_get_playlists( server_type: Literal["local", "cloud"], local_path: str | None = None ): if server_type == "local": path = local_path or server_config.local_playlist_dir server_config.set_and_save_config(local_playlist_dir=path) playlists = scan_local_playlists(path) return {"status": "success", "data": _format_local_playlists_response(playlists)} playlists, status, _, _, _ = _get_cloud_playlists() if status != "connected": return { "status": "error", "message": "未连接到云端服务器,无法获取播放列表。", "data": [], } return {"status": "success", "data": _format_cloud_playlists_response(playlists)} @app.get("/api/server/status") async def api_server_status(): _, status, server_info, selected_library, _ = _get_cloud_playlists( include_playlists=False ) is_connected = status == "connected" libraries = _list_music_libraries() if is_connected else [] return { "status": "success", "data": { "isConnected": is_connected, "name": server_info.get("name"), "ip": server_config.url or None, "port": int(server_config.port) if server_config.port else None, "libraryName": selected_library or None, "libraries": libraries, }, } @app.post("/api/server/connect") async def api_server_connect(payload: PlexConnectionSettings): try: _, token_success = plex_client.connect( username=payload.username or "", password=payload.password or "", token=payload.token or "", scheme=payload.protocol, url=payload.address, port=payload.port, ) except Exception as exc: return JSONResponse( { "status": "error", "message": f"连接失败:{exc}", "data": {"token": "", "serverInfo": {"isConnected": False}}, }, status_code=400, ) library_names = plex_client.get_libs_name_list() or [] selected_library = payload.library if payload.library in library_names else "" if not selected_library and library_names: selected_library = library_names[0] libraries = _list_music_libraries() server_config.set_and_save_config( token=token_success, scheme=payload.protocol, url=payload.address, port=payload.port, library_name=selected_library, ) return { "status": "success", "message": "连接成功", "data": { "token": token_success, "serverInfo": { "isConnected": True, "name": getattr(plex_client.server, "friendlyName", payload.address), "ip": payload.address, "port": int(payload.port) if payload.port else None, "libraryName": selected_library, "libraries": libraries, }, }, } @app.post("/api/server/library") async def api_select_library(selection: LibrarySelection): server_config.set_and_save_config(library_name=selection.library) return {"status": "success", "data": {"libraryName": selection.library}} @app.post("/sync", response_class=HTMLResponse) async def trigger_sync( request: Request, mode: str = Form(...), local_path: str = Form(None) ): local_path = local_path or server_config.local_playlist_dir try: sync_mode = SyncMode(mode) except ValueError: context = _build_home_context( request, local_path, message=f"未知的同步策略:{mode}", message_type="danger", selected_mode=mode, ) return templates.TemplateResponse("home.html", context) try: results = sync_all_playlists( local_dir=local_path, mode=sync_mode, test_folder=TEST_PLAYLIST_DIR, ) merged_count = sum(len(item.merged_paths) for item in results) conflict_count = sum(len(item.conflicts) for item in results) deleted_count = sum(1 for item in results if item.action == "deleted") context = _build_home_context( request, local_path, message="同步完成,输出已写入测试目录用于验证。", message_type="success", sync_result={ "mode": sync_mode.value, "mode_label": next( (item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value), sync_mode.value, ), "merged_count": merged_count, "conflict_count": conflict_count, "delete_count": deleted_count, "playlist_count": len(results), "output_dir": TEST_PLAYLIST_DIR, }, selected_mode=sync_mode.value, ) return templates.TemplateResponse("home.html", context) except Exception as exc: logger.warning(f"Sync failed: {exc}") context = _build_home_context( request, local_path, message=f"同步失败:{exc}", message_type="danger", selected_mode=sync_mode.value, ) return templates.TemplateResponse("home.html", context) @app.post("/path-rules", response_class=HTMLResponse) async def save_path_rules( request: Request, local_path: str = Form(None), pattern: list[str] | None = Form(None), replacement: list[str] | None = Form(None), ): local_path = local_path or server_config.local_playlist_dir patterns = pattern or [] replacements = replacement or [] cleaned_rules: list[dict[str, str]] = [] for pat, repl in zip(patterns, replacements): pat = pat.strip() if not pat: continue cleaned_rules.append({"pattern": pat, "replacement": repl or ""}) server_config.set_and_save_config(path_rules=cleaned_rules) context = _build_home_context( request, local_path, message="正则规则已保存并会在同步前应用。", message_type="success", ) return templates.TemplateResponse("home.html", context) # 登录页面和处理 @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): server_config.load() music_libraries = [] selected_library = server_config.library_name if server_config.url and server_config.token: try: plex_client.connect( token=server_config.token, scheme=server_config.scheme, url=server_config.url, port=server_config.port, ) music_libraries = plex_client.get_libs_name_list() if music_libraries: if selected_library not in music_libraries: selected_library = music_libraries[0] else: selected_library = "" except Exception: selected_library = "" music_libraries = [] return templates.TemplateResponse( "login.html", { "request": request, "theme": server_config.theme, "path": "/login", "token": server_config.token, "scheme": server_config.scheme, "server_url": server_config.url, "port": server_config.port, "music_libraries": music_libraries, "selected_library": selected_library, }, ) @app.post("/login", response_class=HTMLResponse) async def login( request: Request, user: str = Form(...), pw: str = Form(...), token: str = Form(...), scheme: str = Form("https"), url: str = Form(...), port: str = Form("32400"), library_name: str = Form(""), ): # 尝试连接到 Plex 服务器 try: # 优先使用 token 连接,如果 token 为空则使用用户名和密码连接 _, token_success = plex_client.connect( username=user, password=pw, token=token, scheme=scheme, url=url, port=port, ) # 成功连接后保存配置到配置文件 music_libraries: list[str] = [] selected_library = "" if plex_client.connected: try: music_libraries = plex_client.get_libs_name_list() except Exception as exc: logger.warning(f"Unable to fetch music libraries: {exc}") music_libraries = [] if music_libraries: if library_name and library_name in music_libraries: selected_library = library_name else: selected_library = music_libraries[0] server_config.set_and_save_config( token=token_success, scheme=scheme, url=url, port=port, library_name=selected_library, ) else: music_libraries = [] server_config.set_and_save_config( token=token_success, scheme=scheme, url=url, port=port, library_name="" ) return templates.TemplateResponse( "login.html", { "request": request, "message": "连接成功", "message_type": "success", "theme": server_config.theme, "path": "/login", "token": server_config.token, "scheme": server_config.scheme, "server_url": server_config.url, "port": server_config.port, "music_libraries": music_libraries, "selected_library": selected_library, }, ) except Exception as e: music_libraries = [] return templates.TemplateResponse( "login.html", { "request": request, "message": f"连接失败:{str(e)}", "message_type": "danger", "theme": server_config.theme, "path": "/login", "scheme": scheme, "server_url": url, "port": port, "music_libraries": music_libraries, "selected_library": "", }, ) @app.get("/playlist", response_class=HTMLResponse) async def get_playlist(request: Request): return templates.TemplateResponse( "playlist.html", {"request": request, "theme": server_config.theme, "path": "/playlist"}, ) @app.post("/playlist", response_class=HTMLResponse) async def set_playlist( request: Request, address: str = Form(...), interval: str = Form(...) ): # demo:返回提交的设置 return templates.TemplateResponse( "playlist.html", { "request": request, "message": f"设置成功:地址 {address},间隔 {interval} 分钟", "message_type": "info", "theme": server_config.theme, "path": "/playlist", }, ) @app.post("/set-theme") async def set_theme(theme: str = Form(...)): server_config.set_and_save_config(theme=theme) return RedirectResponse("/login", status_code=303)