Files
PlexPlaylistSync/app/main.py
T
2025-11-24 22:21:01 +09:00

297 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from typing import Tuple
from app.utils.config import server_config
from app.utils.playlist_merge import SyncMode, sync_all_playlists, TEST_PLAYLIST_DIR
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from app.utils.plex_client import plex_client
from app.utils.logger import logger
from app.utils.local_playlist import scan_local_playlists
app = FastAPI()
templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
# mount static files
# 这里的路径是相对于 main.py 文件所在的目录
app.mount(
"/static",
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
name="static",
)
SYNC_MODE_OPTIONS = [
{
"value": SyncMode.LOCAL_FORCE.value,
"label": "完全本地优先(local_force",
"description": "单向同步,本地覆盖云端且顺序以本地为准。",
},
{
"value": SyncMode.REMOTE_FORCE.value,
"label": "完全云端优先(remote_force",
"description": "单向同步,云端覆盖本地且顺序以云端为准。",
},
{
"value": SyncMode.MERGE_LOCAL_PRIMARY.value,
"label": "双向合并(本地优先)",
"description": "三方合并,冲突时选择本地版本。",
},
{
"value": SyncMode.MERGE_REMOTE_PRIMARY.value,
"label": "双向合并(云端优先)",
"description": "三方合并,冲突时选择云端版本。",
},
]
def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
"""Fetch playlists and connection state from the remote Plex server."""
server_config.load()
playlists: list[dict] = []
status = "unset"
server_info = {
"name": "未设置",
"domain": server_config.url or "未设置",
}
# no server url configured
if not server_config.url:
return playlists, status, server_info
status = "failed"
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
)
status = "connected" if plex_client.connected else "failed"
server_info.update(
{
"name": getattr(plex_client.server, "friendlyName", "未命名服务器"),
"domain": server_config.url,
}
)
for playlist in plex_client.server.playlists():
track_count = getattr(playlist, "itemCount", None)
if track_count is None:
try:
track_count = len(playlist.items())
except Exception:
track_count = 0
playlists.append(
{
"name": playlist.title,
"track_count": track_count,
}
)
except Exception as exc:
logger.warning(f"Failed to fetch cloud playlists: {exc}")
status = "failed"
playlists.sort(key=lambda item: item["name"].lower())
return playlists, status, server_info
def _build_home_context(
request: Request,
local_path: str,
message: str | None = None,
message_type: str | None = None,
sync_result: dict | None = None,
selected_mode: str | None = None,
):
server_config.load()
local_playlists = scan_local_playlists(local_path)
cloud_playlists, connection_status, server_info = _get_cloud_playlists()
return {
"request": request,
"theme": server_config.theme,
"path": "/",
"local_playlists": local_playlists,
"local_path": local_path,
"cloud_playlists": cloud_playlists,
"connection_status": connection_status,
"server_info": server_info,
"sync_modes": SYNC_MODE_OPTIONS,
"selected_mode": selected_mode,
"message": message,
"message_type": message_type,
"sync_result": sync_result,
}
# 显示主页
@app.get("/", response_class=HTMLResponse)
async def home(request: Request, local_path: str = "playlist"):
context = _build_home_context(request, local_path)
return templates.TemplateResponse("home.html", context)
@app.post("/sync", response_class=HTMLResponse)
async def trigger_sync(request: Request, mode: str = Form(...), local_path: str = Form("playlist")):
try:
sync_mode = SyncMode(mode)
except ValueError:
context = _build_home_context(
request,
local_path,
message=f"未知的同步策略:{mode}",
message_type="danger",
selected_mode=mode,
)
return templates.TemplateResponse("home.html", context)
try:
results = sync_all_playlists(
local_dir=local_path,
mode=sync_mode,
test_folder=TEST_PLAYLIST_DIR,
)
merged_count = sum(len(item.merged_paths) for item in results)
conflict_count = sum(len(item.conflicts) for item in results)
deleted_count = sum(1 for item in results if item.action == "deleted")
context = _build_home_context(
request,
local_path,
message="同步完成,输出已写入测试目录用于验证。",
message_type="success",
sync_result={
"mode": sync_mode.value,
"mode_label": next(
(item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value),
sync_mode.value,
),
"merged_count": merged_count,
"conflict_count": conflict_count,
"delete_count": deleted_count,
"playlist_count": len(results),
"output_dir": TEST_PLAYLIST_DIR,
},
selected_mode=sync_mode.value,
)
return templates.TemplateResponse("home.html", context)
except Exception as exc:
logger.warning(f"Sync failed: {exc}")
context = _build_home_context(
request,
local_path,
message=f"同步失败:{exc}",
message_type="danger",
selected_mode=sync_mode.value,
)
return templates.TemplateResponse("home.html", context)
# 登录页面和处理
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
server_config.load()
return templates.TemplateResponse(
"login.html",
{
"request": request,
"theme": server_config.theme,
"path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
},
)
@app.post("/login", response_class=HTMLResponse)
async def login(
request: Request,
user: str = Form(...),
pw: str = Form(...),
token: str = Form(...),
scheme: str = Form("https"),
url: str = Form(...),
port: str = Form("32400"),
):
# 尝试连接到 Plex 服务器
try:
# 优先使用 token 连接,如果 token 为空则使用用户名和密码连接
_, token_success = plex_client.connect(
username=user,
password=pw,
token=token,
scheme=scheme,
url=url,
port=port,
)
# 成功连接后保存配置到配置文件
if plex_client.connected:
server_config.set_and_save_config(
token=token_success, scheme=scheme, url=url, port=port
)
return templates.TemplateResponse(
"login.html",
{
"request": request,
"message": "连接成功",
"message_type": "success",
"theme": server_config.theme,
"path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
},
)
except Exception as e:
return templates.TemplateResponse(
"login.html",
{
"request": request,
"message": f"连接失败:{str(e)}",
"message_type": "danger",
"theme": server_config.theme,
"path": "/login",
"scheme": scheme,
"server_url": url,
"port": port,
},
)
@app.get("/playlist", response_class=HTMLResponse)
async def get_playlist(request: Request):
return templates.TemplateResponse(
"playlist.html",
{"request": request, "theme": server_config.theme, "path": "/playlist"},
)
@app.post("/playlist", response_class=HTMLResponse)
async def set_playlist(
request: Request, address: str = Form(...), interval: str = Form(...)
):
# demo:返回提交的设置
return templates.TemplateResponse(
"playlist.html",
{
"request": request,
"message": f"设置成功:地址 {address},间隔 {interval} 分钟",
"message_type": "info",
"theme": server_config.theme,
"path": "/playlist",
},
)
@app.post("/set-theme")
async def set_theme(theme: str = Form(...)):
server_config.set_and_save_config(theme=theme)
return RedirectResponse("/login", status_code=303)