import os from typing import Tuple from app.utils.config import server_config from app.utils.playlist_merge import SyncMode, sync_all_playlists, TEST_PLAYLIST_DIR from fastapi import FastAPI, Request, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from app.utils.plex_client import plex_client from app.utils.logger import logger from app.utils.local_playlist import scan_local_playlists app = FastAPI() templates = Jinja2Templates( directory=os.path.join(os.path.dirname(__file__), "templates") ) # mount static files # 这里的路径是相对于 main.py 文件所在的目录 app.mount( "/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static", ) SYNC_MODE_OPTIONS = [ { "value": SyncMode.LOCAL_FORCE.value, "label": "完全本地优先(local_force)", "description": "单向同步,本地覆盖云端且顺序以本地为准。", }, { "value": SyncMode.REMOTE_FORCE.value, "label": "完全云端优先(remote_force)", "description": "单向同步,云端覆盖本地且顺序以云端为准。", }, { "value": SyncMode.MERGE_LOCAL_PRIMARY.value, "label": "双向合并(本地优先)", "description": "三方合并,冲突时选择本地版本。", }, { "value": SyncMode.MERGE_REMOTE_PRIMARY.value, "label": "双向合并(云端优先)", "description": "三方合并,冲突时选择云端版本。", }, ] def _get_cloud_playlists() -> Tuple[list[dict], str, dict]: """Fetch playlists and connection state from the remote Plex server.""" server_config.load() playlists: list[dict] = [] status = "unset" server_info = { "name": "未设置", "domain": server_config.url or "未设置", } # no server url configured if not server_config.url: return playlists, status, server_info status = "failed" try: plex_client.connect( token=server_config.token, scheme=server_config.scheme, url=server_config.url, port=server_config.port, ) status = "connected" if plex_client.connected else "failed" server_info.update( { "name": getattr(plex_client.server, "friendlyName", "未命名服务器"), "domain": server_config.url, } ) for playlist in plex_client.server.playlists(): track_count = getattr(playlist, "itemCount", None) if track_count is None: try: track_count = len(playlist.items()) except Exception: track_count = 0 playlists.append( { "name": playlist.title, "track_count": track_count, } ) except Exception as exc: logger.warning(f"Failed to fetch cloud playlists: {exc}") status = "failed" playlists.sort(key=lambda item: item["name"].lower()) return playlists, status, server_info def _build_home_context( request: Request, local_path: str, message: str | None = None, message_type: str | None = None, sync_result: dict | None = None, selected_mode: str | None = None, ): server_config.load() local_playlists = scan_local_playlists(local_path) cloud_playlists, connection_status, server_info = _get_cloud_playlists() return { "request": request, "theme": server_config.theme, "path": "/", "local_playlists": local_playlists, "local_path": local_path, "cloud_playlists": cloud_playlists, "connection_status": connection_status, "server_info": server_info, "sync_modes": SYNC_MODE_OPTIONS, "selected_mode": selected_mode, "message": message, "message_type": message_type, "sync_result": sync_result, } # 显示主页 @app.get("/", response_class=HTMLResponse) async def home(request: Request, local_path: str = "playlist"): context = _build_home_context(request, local_path) return templates.TemplateResponse("home.html", context) @app.post("/sync", response_class=HTMLResponse) async def trigger_sync(request: Request, mode: str = Form(...), local_path: str = Form("playlist")): try: sync_mode = SyncMode(mode) except ValueError: context = _build_home_context( request, local_path, message=f"未知的同步策略:{mode}", message_type="danger", selected_mode=mode, ) return templates.TemplateResponse("home.html", context) try: results = sync_all_playlists( local_dir=local_path, mode=sync_mode, test_folder=TEST_PLAYLIST_DIR, ) merged_count = sum(len(item.merged_paths) for item in results) conflict_count = sum(len(item.conflicts) for item in results) deleted_count = sum(1 for item in results if item.action == "deleted") context = _build_home_context( request, local_path, message="同步完成,输出已写入测试目录用于验证。", message_type="success", sync_result={ "mode": sync_mode.value, "mode_label": next( (item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value), sync_mode.value, ), "merged_count": merged_count, "conflict_count": conflict_count, "delete_count": deleted_count, "playlist_count": len(results), "output_dir": TEST_PLAYLIST_DIR, }, selected_mode=sync_mode.value, ) return templates.TemplateResponse("home.html", context) except Exception as exc: logger.warning(f"Sync failed: {exc}") context = _build_home_context( request, local_path, message=f"同步失败:{exc}", message_type="danger", selected_mode=sync_mode.value, ) return templates.TemplateResponse("home.html", context) # 登录页面和处理 @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): server_config.load() return templates.TemplateResponse( "login.html", { "request": request, "theme": server_config.theme, "path": "/login", "token": server_config.token, "scheme": server_config.scheme, "server_url": server_config.url, "port": server_config.port, }, ) @app.post("/login", response_class=HTMLResponse) async def login( request: Request, user: str = Form(...), pw: str = Form(...), token: str = Form(...), scheme: str = Form("https"), url: str = Form(...), port: str = Form("32400"), ): # 尝试连接到 Plex 服务器 try: # 优先使用 token 连接,如果 token 为空则使用用户名和密码连接 _, token_success = plex_client.connect( username=user, password=pw, token=token, scheme=scheme, url=url, port=port, ) # 成功连接后保存配置到配置文件 if plex_client.connected: server_config.set_and_save_config( token=token_success, scheme=scheme, url=url, port=port ) return templates.TemplateResponse( "login.html", { "request": request, "message": "连接成功", "message_type": "success", "theme": server_config.theme, "path": "/login", "token": server_config.token, "scheme": server_config.scheme, "server_url": server_config.url, "port": server_config.port, }, ) except Exception as e: return templates.TemplateResponse( "login.html", { "request": request, "message": f"连接失败:{str(e)}", "message_type": "danger", "theme": server_config.theme, "path": "/login", "scheme": scheme, "server_url": url, "port": port, }, ) @app.get("/playlist", response_class=HTMLResponse) async def get_playlist(request: Request): return templates.TemplateResponse( "playlist.html", {"request": request, "theme": server_config.theme, "path": "/playlist"}, ) @app.post("/playlist", response_class=HTMLResponse) async def set_playlist( request: Request, address: str = Form(...), interval: str = Form(...) ): # demo:返回提交的设置 return templates.TemplateResponse( "playlist.html", { "request": request, "message": f"设置成功:地址 {address},间隔 {interval} 分钟", "message_type": "info", "theme": server_config.theme, "path": "/playlist", }, ) @app.post("/set-theme") async def set_theme(theme: str = Form(...)): server_config.set_and_save_config(theme=theme) return RedirectResponse("/login", status_code=303)