327 lines
10 KiB
Python
327 lines
10 KiB
Python
import os
|
||
from typing import Tuple
|
||
from app.utils.config import server_config
|
||
from app.utils.playlist_merge import SyncMode, sync_all_playlists, TEST_PLAYLIST_DIR
|
||
from fastapi import FastAPI, Request, Form
|
||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||
from fastapi.templating import Jinja2Templates
|
||
from fastapi.staticfiles import StaticFiles
|
||
from app.utils.plex_client import plex_client
|
||
from app.utils.logger import logger
|
||
from app.utils.local_playlist import scan_local_playlists
|
||
|
||
app = FastAPI()
|
||
templates = Jinja2Templates(
|
||
directory=os.path.join(os.path.dirname(__file__), "templates")
|
||
)
|
||
|
||
# mount static files
|
||
# 这里的路径是相对于 main.py 文件所在的目录
|
||
app.mount(
|
||
"/static",
|
||
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
|
||
name="static",
|
||
)
|
||
|
||
|
||
SYNC_MODE_OPTIONS = [
|
||
{
|
||
"value": SyncMode.LOCAL_FORCE.value,
|
||
"label": "完全本地优先(local_force)",
|
||
"description": "单向同步,本地覆盖云端且顺序以本地为准。",
|
||
},
|
||
{
|
||
"value": SyncMode.REMOTE_FORCE.value,
|
||
"label": "完全云端优先(remote_force)",
|
||
"description": "单向同步,云端覆盖本地且顺序以云端为准。",
|
||
},
|
||
{
|
||
"value": SyncMode.MERGE_LOCAL_PRIMARY.value,
|
||
"label": "双向合并(本地优先)",
|
||
"description": "三方合并,冲突时选择本地版本。",
|
||
},
|
||
{
|
||
"value": SyncMode.MERGE_REMOTE_PRIMARY.value,
|
||
"label": "双向合并(云端优先)",
|
||
"description": "三方合并,冲突时选择云端版本。",
|
||
},
|
||
]
|
||
|
||
|
||
def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
|
||
"""Fetch playlists and connection state from the remote Plex server."""
|
||
|
||
server_config.load()
|
||
playlists: list[dict] = []
|
||
status = "unset"
|
||
server_info = {
|
||
"name": "未设置",
|
||
"domain": server_config.url or "未设置",
|
||
}
|
||
|
||
# no server url configured
|
||
if not server_config.url:
|
||
return playlists, status, server_info
|
||
|
||
status = "failed"
|
||
try:
|
||
plex_client.connect(
|
||
token=server_config.token,
|
||
scheme=server_config.scheme,
|
||
url=server_config.url,
|
||
port=server_config.port,
|
||
)
|
||
status = "connected" if plex_client.connected else "failed"
|
||
server_info.update(
|
||
{
|
||
"name": getattr(plex_client.server, "friendlyName", "未命名服务器"),
|
||
"domain": server_config.url,
|
||
}
|
||
)
|
||
|
||
for playlist in plex_client.server.playlists():
|
||
track_count = getattr(playlist, "itemCount", None)
|
||
if track_count is None:
|
||
try:
|
||
track_count = len(playlist.items())
|
||
except Exception:
|
||
track_count = 0
|
||
playlists.append(
|
||
{
|
||
"name": playlist.title,
|
||
"track_count": track_count,
|
||
}
|
||
)
|
||
except Exception as exc:
|
||
logger.warning(f"Failed to fetch cloud playlists: {exc}")
|
||
status = "failed"
|
||
|
||
playlists.sort(key=lambda item: item["name"].lower())
|
||
return playlists, status, server_info
|
||
|
||
def _build_home_context(
|
||
request: Request,
|
||
local_path: str,
|
||
message: str | None = None,
|
||
message_type: str | None = None,
|
||
sync_result: dict | None = None,
|
||
selected_mode: str | None = None,
|
||
):
|
||
server_config.load()
|
||
local_playlists = scan_local_playlists(local_path)
|
||
cloud_playlists, connection_status, server_info = _get_cloud_playlists()
|
||
|
||
return {
|
||
"request": request,
|
||
"theme": server_config.theme,
|
||
"path": "/",
|
||
"local_playlists": local_playlists,
|
||
"local_path": local_path,
|
||
"cloud_playlists": cloud_playlists,
|
||
"connection_status": connection_status,
|
||
"server_info": server_info,
|
||
"sync_modes": SYNC_MODE_OPTIONS,
|
||
"selected_mode": selected_mode,
|
||
"message": message,
|
||
"message_type": message_type,
|
||
"sync_result": sync_result,
|
||
"path_rules": server_config.path_rules,
|
||
}
|
||
|
||
|
||
# 显示主页
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def home(request: Request, local_path: str = "playlist"):
|
||
context = _build_home_context(request, local_path)
|
||
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
|
||
@app.post("/sync", response_class=HTMLResponse)
|
||
async def trigger_sync(request: Request, mode: str = Form(...), local_path: str = Form("playlist")):
|
||
try:
|
||
sync_mode = SyncMode(mode)
|
||
except ValueError:
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message=f"未知的同步策略:{mode}",
|
||
message_type="danger",
|
||
selected_mode=mode,
|
||
)
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
try:
|
||
results = sync_all_playlists(
|
||
local_dir=local_path,
|
||
mode=sync_mode,
|
||
test_folder=TEST_PLAYLIST_DIR,
|
||
)
|
||
merged_count = sum(len(item.merged_paths) for item in results)
|
||
conflict_count = sum(len(item.conflicts) for item in results)
|
||
deleted_count = sum(1 for item in results if item.action == "deleted")
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message="同步完成,输出已写入测试目录用于验证。",
|
||
message_type="success",
|
||
sync_result={
|
||
"mode": sync_mode.value,
|
||
"mode_label": next(
|
||
(item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value),
|
||
sync_mode.value,
|
||
),
|
||
"merged_count": merged_count,
|
||
"conflict_count": conflict_count,
|
||
"delete_count": deleted_count,
|
||
"playlist_count": len(results),
|
||
"output_dir": TEST_PLAYLIST_DIR,
|
||
},
|
||
selected_mode=sync_mode.value,
|
||
)
|
||
return templates.TemplateResponse("home.html", context)
|
||
except Exception as exc:
|
||
logger.warning(f"Sync failed: {exc}")
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message=f"同步失败:{exc}",
|
||
message_type="danger",
|
||
selected_mode=sync_mode.value,
|
||
)
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
|
||
@app.post("/path-rules", response_class=HTMLResponse)
|
||
async def save_path_rules(
|
||
request: Request,
|
||
local_path: str = Form("playlist"),
|
||
pattern: list[str] | None = Form(None),
|
||
replacement: list[str] | None = Form(None),
|
||
):
|
||
patterns = pattern or []
|
||
replacements = replacement or []
|
||
|
||
cleaned_rules: list[dict[str, str]] = []
|
||
for pat, repl in zip(patterns, replacements):
|
||
pat = pat.strip()
|
||
if not pat:
|
||
continue
|
||
cleaned_rules.append({"pattern": pat, "replacement": repl or ""})
|
||
|
||
server_config.set_and_save_config(path_rules=cleaned_rules)
|
||
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message="正则规则已保存并会在同步前应用。",
|
||
message_type="success",
|
||
)
|
||
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
|
||
# 登录页面和处理
|
||
@app.get("/login", response_class=HTMLResponse)
|
||
async def login_page(request: Request):
|
||
server_config.load()
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
{
|
||
"request": request,
|
||
"theme": server_config.theme,
|
||
"path": "/login",
|
||
"token": server_config.token,
|
||
"scheme": server_config.scheme,
|
||
"server_url": server_config.url,
|
||
"port": server_config.port,
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/login", response_class=HTMLResponse)
|
||
async def login(
|
||
request: Request,
|
||
user: str = Form(...),
|
||
pw: str = Form(...),
|
||
token: str = Form(...),
|
||
scheme: str = Form("https"),
|
||
url: str = Form(...),
|
||
port: str = Form("32400"),
|
||
):
|
||
# 尝试连接到 Plex 服务器
|
||
try:
|
||
# 优先使用 token 连接,如果 token 为空则使用用户名和密码连接
|
||
_, token_success = plex_client.connect(
|
||
username=user,
|
||
password=pw,
|
||
token=token,
|
||
scheme=scheme,
|
||
url=url,
|
||
port=port,
|
||
)
|
||
# 成功连接后保存配置到配置文件
|
||
if plex_client.connected:
|
||
server_config.set_and_save_config(
|
||
token=token_success, scheme=scheme, url=url, port=port
|
||
)
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
{
|
||
"request": request,
|
||
"message": "连接成功",
|
||
"message_type": "success",
|
||
"theme": server_config.theme,
|
||
"path": "/login",
|
||
"token": server_config.token,
|
||
"scheme": server_config.scheme,
|
||
"server_url": server_config.url,
|
||
"port": server_config.port,
|
||
},
|
||
)
|
||
except Exception as e:
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
{
|
||
"request": request,
|
||
"message": f"连接失败:{str(e)}",
|
||
"message_type": "danger",
|
||
"theme": server_config.theme,
|
||
"path": "/login",
|
||
"scheme": scheme,
|
||
"server_url": url,
|
||
"port": port,
|
||
},
|
||
)
|
||
|
||
|
||
@app.get("/playlist", response_class=HTMLResponse)
|
||
async def get_playlist(request: Request):
|
||
return templates.TemplateResponse(
|
||
"playlist.html",
|
||
{"request": request, "theme": server_config.theme, "path": "/playlist"},
|
||
)
|
||
|
||
|
||
@app.post("/playlist", response_class=HTMLResponse)
|
||
async def set_playlist(
|
||
request: Request, address: str = Form(...), interval: str = Form(...)
|
||
):
|
||
# demo:返回提交的设置
|
||
return templates.TemplateResponse(
|
||
"playlist.html",
|
||
{
|
||
"request": request,
|
||
"message": f"设置成功:地址 {address},间隔 {interval} 分钟",
|
||
"message_type": "info",
|
||
"theme": server_config.theme,
|
||
"path": "/playlist",
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/set-theme")
|
||
async def set_theme(theme: str = Form(...)):
|
||
server_config.set_and_save_config(theme=theme)
|
||
return RedirectResponse("/login", status_code=303)
|