855 lines
28 KiB
Python
855 lines
28 KiB
Python
import os
|
||
from datetime import datetime
|
||
from typing import Sequence
|
||
|
||
from fastapi import FastAPI, Form, HTTPException, Query, Request
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
import asyncio
|
||
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse, StreamingResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.templating import Jinja2Templates
|
||
from pydantic import BaseModel, Field
|
||
|
||
from app.utils.config import server_config
|
||
from app.utils.local_playlist import load_local_playlist, scan_local_playlists
|
||
from app.utils.logger import logger
|
||
from app.utils.playlist_merge import SyncMode, TEST_PLAYLIST_DIR, sync_all_playlists
|
||
from app.utils.plex_client import plex_client
|
||
from app.utils.scheduler import start_scheduler, update_scheduler_job, get_next_run_time, validate_cron_expression
|
||
from app.utils.sync_manager import sync_manager
|
||
|
||
app = FastAPI()
|
||
|
||
@app.on_event("startup")
|
||
async def startup_event():
|
||
sync_manager.set_event_loop(asyncio.get_running_loop())
|
||
start_scheduler()
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
templates = Jinja2Templates(
|
||
directory=os.path.join(os.path.dirname(__file__), "templates")
|
||
)
|
||
|
||
FRONTEND_DIST_PATH = os.path.abspath(
|
||
os.path.join(os.path.dirname(__file__), "..", "frontend", "dist")
|
||
)
|
||
|
||
# mount static files
|
||
# 这里的路径是相对于 main.py 文件所在的目录
|
||
app.mount(
|
||
"/static",
|
||
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
|
||
name="static",
|
||
)
|
||
|
||
if os.path.isdir(os.path.join(FRONTEND_DIST_PATH, "assets")):
|
||
app.mount(
|
||
"/assets",
|
||
StaticFiles(directory=os.path.join(FRONTEND_DIST_PATH, "assets")),
|
||
name="frontend-assets",
|
||
)
|
||
|
||
|
||
SYNC_MODE_OPTIONS = [
|
||
{
|
||
"value": SyncMode.LOCAL_FORCE.value,
|
||
"label": "完全本地优先(local_force)",
|
||
"description": "单向同步,本地覆盖云端且顺序以本地为准。",
|
||
},
|
||
{
|
||
"value": SyncMode.REMOTE_FORCE.value,
|
||
"label": "完全云端优先(remote_force)",
|
||
"description": "单向同步,云端覆盖本地且顺序以云端为准。",
|
||
},
|
||
{
|
||
"value": SyncMode.MERGE_LOCAL_PRIMARY.value,
|
||
"label": "双向合并(本地优先)",
|
||
"description": "三方合并,冲突时选择本地版本。",
|
||
},
|
||
{
|
||
"value": SyncMode.MERGE_REMOTE_PRIMARY.value,
|
||
"label": "双向合并(云端优先)",
|
||
"description": "三方合并,冲突时选择云端版本。",
|
||
},
|
||
]
|
||
|
||
|
||
class PlaylistItem(BaseModel):
|
||
id: str
|
||
title: str
|
||
trackCount: int = Field(..., ge=0)
|
||
lastUpdated: str | None = None
|
||
|
||
|
||
class RegexRule(BaseModel):
|
||
pattern: str
|
||
replacement: str = ""
|
||
|
||
|
||
class SyncSettingsResponse(BaseModel):
|
||
sync_mode: str
|
||
path_rules: list[RegexRule]
|
||
local_path: str
|
||
library_name: str | None = None
|
||
server_url: str | None = None
|
||
scheme: str | None = None
|
||
port: str | None = None
|
||
timeout: int | None = None
|
||
token: str | None = None
|
||
|
||
|
||
class ConnectRequest(BaseModel):
|
||
protocol: str = Field("https", pattern="https?", description="HTTP or HTTPS")
|
||
address: str
|
||
port: str = "32400"
|
||
token: str = ""
|
||
username: str | None = None
|
||
password: str | None = None
|
||
timeout: int | None = None
|
||
library_name: str | None = None
|
||
|
||
|
||
class ScheduleSettings(BaseModel):
|
||
mode: str
|
||
cronExpression: str
|
||
dailyTime: str
|
||
weeklyDays: list[int]
|
||
weeklyTime: str
|
||
autoWatch: bool
|
||
|
||
|
||
@app.get("/api/schedule")
|
||
async def get_schedule():
|
||
next_run = get_next_run_time()
|
||
next_run_str = next_run.strftime("%Y-%m-%d %H:%M:%S") if next_run else None
|
||
|
||
return {
|
||
"mode": server_config.schedule_mode,
|
||
"cronExpression": server_config.schedule_cron,
|
||
"dailyTime": server_config.schedule_daily_time,
|
||
"weeklyDays": server_config.schedule_weekly_days,
|
||
"weeklyTime": server_config.schedule_weekly_time,
|
||
"autoWatch": server_config.schedule_auto_watch,
|
||
"nextRun": next_run_str
|
||
}
|
||
|
||
|
||
@app.put("/api/schedule")
|
||
async def save_schedule(settings: ScheduleSettings):
|
||
# Validate Cron if mode is CRON
|
||
if settings.mode == "CRON" and settings.cronExpression.strip():
|
||
if not validate_cron_expression(settings.cronExpression):
|
||
raise HTTPException(status_code=400, detail="Invalid Cron expression format")
|
||
|
||
server_config.set_schedule(
|
||
mode=settings.mode,
|
||
cron=settings.cronExpression,
|
||
daily_time=settings.dailyTime,
|
||
weekly_days=settings.weeklyDays,
|
||
weekly_time=settings.weeklyTime,
|
||
auto_watch=settings.autoWatch
|
||
)
|
||
update_scheduler_job()
|
||
logger.info(f"Schedule settings updated via API. Mode: {settings.mode}")
|
||
return {"status": "success", "message": "Schedule updated"}
|
||
|
||
|
||
class ConnectResponse(BaseModel):
|
||
token: str
|
||
serverInfo: dict
|
||
|
||
|
||
def _get_cloud_playlists() -> tuple[list[dict], str, dict, str, list[str]]:
|
||
"""Fetch playlists and connection state from the remote Plex server."""
|
||
|
||
server_config.load()
|
||
playlists: list[dict] = []
|
||
status = "unset"
|
||
server_info = {
|
||
"name": "未设置",
|
||
"domain": server_config.url or "未设置",
|
||
}
|
||
selected_library = server_config.library_name
|
||
music_libraries: list[str] = []
|
||
|
||
# no server url configured
|
||
if not server_config.url:
|
||
return playlists, status, server_info, selected_library, music_libraries
|
||
|
||
status = "failed"
|
||
try:
|
||
plex_client.connect(
|
||
token=server_config.token,
|
||
scheme=server_config.scheme,
|
||
url=server_config.url,
|
||
port=server_config.port,
|
||
timeout=server_config.timeout,
|
||
)
|
||
status = "connected" if plex_client.connected else "failed"
|
||
server_info.update(
|
||
{
|
||
"name": getattr(plex_client.server, "friendlyName", "未命名服务器"),
|
||
"domain": server_config.url,
|
||
}
|
||
)
|
||
|
||
music_libraries = plex_client.get_libs_name_list()
|
||
if not music_libraries:
|
||
server_config.set_and_save_config(library_name="")
|
||
return playlists, status, server_info, "", music_libraries
|
||
|
||
if not selected_library or selected_library not in music_libraries:
|
||
selected_library = music_libraries[0]
|
||
server_config.set_and_save_config(library_name=selected_library)
|
||
|
||
for playlist in plex_client.get_lib_playlists(selected_library) or []:
|
||
track_count = getattr(playlist, "itemCount", None)
|
||
if track_count is None:
|
||
try:
|
||
track_count = len(playlist.items())
|
||
except Exception:
|
||
track_count = 0
|
||
playlists.append(
|
||
{
|
||
"name": playlist.title,
|
||
"track_count": track_count,
|
||
}
|
||
)
|
||
except Exception as exc:
|
||
logger.warning(f"Failed to fetch cloud playlists: {exc}")
|
||
status = "failed"
|
||
|
||
playlists.sort(key=lambda item: item["name"].lower())
|
||
return playlists, status, server_info, selected_library, music_libraries
|
||
|
||
|
||
def _library_dicts(names: Sequence[str]) -> list[dict]:
|
||
return [{"id": name, "title": name, "type": "artist"} for name in names]
|
||
|
||
|
||
def _playlist_item(name: str, track_count: int, prefix: str, last_updated: float | None = None) -> PlaylistItem:
|
||
updated_value = (
|
||
datetime.utcfromtimestamp(last_updated).isoformat() + "Z"
|
||
if last_updated
|
||
else datetime.utcnow().isoformat() + "Z"
|
||
)
|
||
return PlaylistItem(
|
||
id=f"{prefix}-{name}",
|
||
title=name,
|
||
trackCount=track_count,
|
||
lastUpdated=updated_value,
|
||
)
|
||
|
||
|
||
def _scan_local_playlists_with_meta(local_path: str) -> list[PlaylistItem]:
|
||
items: list[PlaylistItem] = []
|
||
base_path = local_path or server_config.local_path
|
||
if not base_path:
|
||
return items
|
||
|
||
absolute_path = os.path.abspath(base_path)
|
||
if not os.path.isdir(absolute_path):
|
||
logger.warning(f"Playlist path does not exist or is not a directory: {absolute_path}")
|
||
return items
|
||
|
||
for entry in os.scandir(absolute_path):
|
||
if not entry.is_file():
|
||
continue
|
||
if not entry.name.lower().endswith((".m3u", ".m3u8")):
|
||
continue
|
||
tracks = load_local_playlist(entry.path)
|
||
stat_info = entry.stat()
|
||
items.append(
|
||
_playlist_item(
|
||
name=entry.name,
|
||
track_count=len(tracks),
|
||
prefix="local",
|
||
last_updated=stat_info.st_mtime,
|
||
)
|
||
)
|
||
|
||
items.sort(key=lambda playlist: playlist.title.lower())
|
||
return items
|
||
|
||
|
||
def _get_server_status() -> tuple[dict, str, list[dict]]:
|
||
"""Return server connection status and available libraries."""
|
||
|
||
server_config.load()
|
||
if not server_config.url:
|
||
return {"isConnected": False}, "unset", []
|
||
|
||
connection_status = "failed"
|
||
libraries: list[dict] = []
|
||
server_info = {
|
||
"isConnected": False,
|
||
"name": "未设置",
|
||
"ip": server_config.url,
|
||
"port": server_config.port,
|
||
"libraryName": server_config.library_name,
|
||
}
|
||
try:
|
||
plex_client.connect(
|
||
token=server_config.token,
|
||
scheme=server_config.scheme,
|
||
url=server_config.url,
|
||
port=server_config.port,
|
||
timeout=server_config.timeout,
|
||
)
|
||
connection_status = "connected" if plex_client.connected else "failed"
|
||
server_info.update(
|
||
{
|
||
"isConnected": plex_client.connected,
|
||
"name": getattr(plex_client.server, "friendlyName", "未命名服务器"),
|
||
"ip": server_config.url,
|
||
"port": server_config.port,
|
||
}
|
||
)
|
||
if plex_client.connected:
|
||
lib_names = plex_client.get_libs_name_list()
|
||
libraries = _library_dicts(lib_names)
|
||
if lib_names:
|
||
selected_library = server_config.library_name or lib_names[0]
|
||
if selected_library not in lib_names:
|
||
selected_library = lib_names[0]
|
||
server_config.set_and_save_config(library_name=selected_library)
|
||
server_info["libraryName"] = selected_library
|
||
except Exception as exc:
|
||
logger.warning(f"Failed to connect to Plex server: {exc}")
|
||
return server_info, connection_status, libraries
|
||
|
||
|
||
class SyncModePayload(BaseModel):
|
||
mode: SyncMode
|
||
|
||
|
||
class RegexRulePayload(BaseModel):
|
||
rules: list[RegexRule]
|
||
|
||
|
||
class LibrarySelection(BaseModel):
|
||
library_name: str
|
||
|
||
|
||
class SyncRequest(BaseModel):
|
||
mode: SyncMode | None = None
|
||
local_path: str | None = None
|
||
|
||
|
||
@app.get("/api/settings", response_model=SyncSettingsResponse)
|
||
async def get_settings():
|
||
server_config.load()
|
||
rules = [
|
||
RegexRule(pattern=rule.get("pattern", ""), replacement=rule.get("replacement", ""))
|
||
for rule in server_config.path_rules
|
||
]
|
||
return SyncSettingsResponse(
|
||
sync_mode=server_config.sync_mode,
|
||
path_rules=rules,
|
||
local_path=server_config.local_path,
|
||
library_name=server_config.library_name,
|
||
server_url=server_config.url,
|
||
scheme=server_config.scheme,
|
||
port=server_config.port,
|
||
timeout=server_config.timeout,
|
||
token=server_config.token,
|
||
)
|
||
|
||
|
||
@app.put("/api/settings/sync-mode")
|
||
async def update_sync_mode(payload: SyncModePayload):
|
||
server_config.set_and_save_config(sync_mode=payload.mode.value)
|
||
return {"sync_mode": payload.mode.value}
|
||
|
||
|
||
@app.get("/api/settings/regex-rules")
|
||
async def get_regex_rules():
|
||
server_config.load()
|
||
return {"rules": server_config.path_rules}
|
||
|
||
|
||
@app.put("/api/settings/regex-rules")
|
||
async def update_regex_rules(payload: RegexRulePayload):
|
||
server_config.set_and_save_config(path_rules=[rule.model_dump() for rule in payload.rules])
|
||
return {"rules": payload.rules}
|
||
|
||
|
||
@app.put("/api/settings/library")
|
||
async def update_library(payload: LibrarySelection):
|
||
server_config.set_and_save_config(library_name=payload.library_name)
|
||
return {"library_name": server_config.library_name}
|
||
|
||
|
||
@app.get("/api/server")
|
||
async def api_server_status():
|
||
server_info, status, libraries = _get_server_status()
|
||
return {"status": status, "serverInfo": server_info, "libraries": libraries}
|
||
|
||
|
||
@app.post("/api/connect", response_model=ConnectResponse)
|
||
async def api_connect(payload: ConnectRequest):
|
||
try:
|
||
_, token = plex_client.connect(
|
||
username=payload.username or "",
|
||
password=payload.password or "",
|
||
token=payload.token or "",
|
||
scheme=payload.protocol,
|
||
url=payload.address,
|
||
port=payload.port,
|
||
timeout=payload.timeout,
|
||
)
|
||
libraries = []
|
||
selected_library = payload.library_name or server_config.library_name
|
||
if plex_client.connected:
|
||
lib_names = plex_client.get_libs_name_list()
|
||
libraries = _library_dicts(lib_names)
|
||
if lib_names:
|
||
if not selected_library or selected_library not in lib_names:
|
||
selected_library = lib_names[0]
|
||
server_config.set_and_save_config(
|
||
token=token,
|
||
scheme=payload.protocol,
|
||
url=payload.address,
|
||
port=payload.port,
|
||
timeout=payload.timeout,
|
||
library_name=selected_library or "",
|
||
)
|
||
server_info = {
|
||
"isConnected": plex_client.connected,
|
||
"name": getattr(plex_client.server, "friendlyName", "未命名服务器") if plex_client.connected else "未命名服务器",
|
||
"ip": payload.address,
|
||
"port": payload.port,
|
||
"libraryName": selected_library or "",
|
||
"libraries": libraries,
|
||
}
|
||
return ConnectResponse(token=token, serverInfo=server_info)
|
||
except Exception as exc:
|
||
logger.warning(f"Failed to connect via API: {exc}")
|
||
raise HTTPException(status_code=400, detail=str(exc))
|
||
|
||
|
||
@app.get("/api/playlists")
|
||
async def api_playlists(server: str = Query(..., pattern="(?i)^(local|cloud)$"), local_path: str | None = None):
|
||
server_type = server.lower()
|
||
if server_type == "local":
|
||
resolved_path = local_path or server_config.local_path
|
||
server_config.set_and_save_config(local_path=resolved_path)
|
||
playlists = _scan_local_playlists_with_meta(resolved_path)
|
||
return {"playlists": [item.model_dump() for item in playlists]}
|
||
|
||
playlists, connection_status, server_info, selected_library, libraries = _get_cloud_playlists()
|
||
items = [_playlist_item(item["name"], item.get("track_count", 0), "cloud") for item in playlists]
|
||
return {
|
||
"playlists": [item.model_dump() for item in items],
|
||
"connection_status": connection_status,
|
||
"server_info": server_info,
|
||
"library": selected_library,
|
||
"libraries": _library_dicts(libraries),
|
||
}
|
||
|
||
|
||
@app.get("/api/sync/status")
|
||
async def get_sync_status():
|
||
return sync_manager.status
|
||
|
||
@app.get("/api/sync/events")
|
||
async def sync_events(request: Request):
|
||
async def event_generator():
|
||
q = await sync_manager.subscribe()
|
||
try:
|
||
while True:
|
||
if await request.is_disconnected():
|
||
break
|
||
data = await q.get()
|
||
yield f"data: {data}\n\n"
|
||
finally:
|
||
sync_manager.unsubscribe(q)
|
||
|
||
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
||
|
||
@app.post("/api/sync")
|
||
async def api_sync(payload: SyncRequest):
|
||
server_config.load()
|
||
try:
|
||
sync_mode = payload.mode or SyncMode(server_config.sync_mode)
|
||
except ValueError as exc:
|
||
raise HTTPException(status_code=400, detail=str(exc))
|
||
|
||
# Update config temporarily for this sync if needed, but sync_manager reads from config.
|
||
# If payload overrides config, we might need to handle that.
|
||
# However, sync_manager._perform_sync reads from server_config.
|
||
# If we want to support one-off sync with custom params via sync_manager, we need to update sync_manager.
|
||
|
||
# For now, let's assume payload params should be saved or used.
|
||
# But sync_manager is designed to run background tasks too.
|
||
|
||
# If we want to keep the existing behavior of api_sync (blocking and returning stats),
|
||
# we can use sync_manager.run_sync(wait=True).
|
||
# But we need to make sure sync_manager uses the params from payload if provided.
|
||
|
||
# Since sync_manager reads from server_config, let's update server_config if payload has values.
|
||
# Or better, pass params to sync_manager.run_sync?
|
||
# sync_manager._perform_sync currently hardcodes reading from server_config.
|
||
|
||
# Let's stick to the requirement: "watchdog当发现更改时,执行同步,同步时UI页面也会显示正在同步状态。"
|
||
# This implies we need a shared state.
|
||
|
||
# If I change api_sync to use sync_manager, I need to ensure it supports the custom params.
|
||
# But payload.local_path and payload.mode are optional.
|
||
|
||
# Let's modify sync_manager to accept overrides.
|
||
# But wait, sync_manager is a singleton.
|
||
|
||
# For this task, I will just wrap the existing logic in sync_manager.run_sync(wait=True)
|
||
# AND I will modify sync_manager to allow passing explicit args to _perform_sync.
|
||
|
||
# But first, let's update api_sync to use sync_manager.run_sync(wait=True)
|
||
# AND we need to handle the parameter passing.
|
||
|
||
# Actually, looking at sync_manager implementation I just wrote:
|
||
# def _perform_sync(self):
|
||
# server_config.load()
|
||
# return sync_all_playlists(local_dir=server_config.local_path, mode=SyncMode(server_config.sync_mode))
|
||
|
||
# It ignores arguments. This is a limitation.
|
||
# I should update SyncManager to accept kwargs for sync_all_playlists.
|
||
|
||
# Let's update SyncManager first.
|
||
pass
|
||
|
||
@app.post("/api/sync")
|
||
async def api_sync(payload: SyncRequest):
|
||
server_config.load()
|
||
try:
|
||
sync_mode = payload.mode or SyncMode(server_config.sync_mode)
|
||
except ValueError as exc:
|
||
raise HTTPException(status_code=400, detail=str(exc))
|
||
|
||
local_dir = payload.local_path or server_config.local_path
|
||
|
||
# Use sync_manager to execute sync, ensuring state is updated
|
||
try:
|
||
results = sync_manager.run_sync(
|
||
trigger_source="api",
|
||
wait=True,
|
||
# We need to pass these to _perform_sync
|
||
sync_kwargs={"local_dir": local_dir, "mode": sync_mode}
|
||
)
|
||
except Exception as e:
|
||
if str(e) == "Sync already in progress":
|
||
raise HTTPException(status_code=409, detail="Sync already in progress")
|
||
raise e
|
||
|
||
merged_count = sum(len(item.merged_paths) for item in results)
|
||
conflict_count = sum(len(item.conflicts) for item in results)
|
||
deleted_count = sum(1 for item in results if item.action == "deleted")
|
||
|
||
return {
|
||
"mode": sync_mode.value,
|
||
"merged_count": merged_count,
|
||
"conflict_count": conflict_count,
|
||
"delete_count": deleted_count,
|
||
"playlist_count": len(results),
|
||
"output_dir": TEST_PLAYLIST_DIR,
|
||
}
|
||
|
||
|
||
def _build_home_context(
|
||
request: Request,
|
||
local_path: str,
|
||
message: str | None = None,
|
||
message_type: str | None = None,
|
||
sync_result: dict | None = None,
|
||
selected_mode: str | None = None,
|
||
):
|
||
server_config.load()
|
||
local_playlists = scan_local_playlists(local_path)
|
||
(
|
||
cloud_playlists,
|
||
connection_status,
|
||
server_info,
|
||
selected_library,
|
||
music_libraries,
|
||
) = _get_cloud_playlists()
|
||
|
||
return {
|
||
"request": request,
|
||
"theme": server_config.theme,
|
||
"path": "/",
|
||
"local_playlists": local_playlists,
|
||
"local_path": local_path,
|
||
"cloud_playlists": cloud_playlists,
|
||
"connection_status": connection_status,
|
||
"server_info": server_info,
|
||
"selected_library": selected_library,
|
||
"music_libraries": music_libraries,
|
||
"sync_modes": SYNC_MODE_OPTIONS,
|
||
"selected_mode": selected_mode or server_config.sync_mode,
|
||
"message": message,
|
||
"message_type": message_type,
|
||
"sync_result": sync_result,
|
||
"path_rules": server_config.path_rules,
|
||
}
|
||
|
||
|
||
# 显示主页
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def home(request: Request, local_path: str = "playlist"):
|
||
index_path = os.path.join(FRONTEND_DIST_PATH, "index.html")
|
||
if os.path.exists(index_path):
|
||
return FileResponse(index_path)
|
||
|
||
context = _build_home_context(request, local_path or server_config.local_path)
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
|
||
@app.post("/sync", response_class=HTMLResponse)
|
||
async def trigger_sync(request: Request, mode: str = Form(...), local_path: str = Form("playlist")):
|
||
try:
|
||
sync_mode = SyncMode(mode)
|
||
except ValueError:
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message=f"未知的同步策略:{mode}",
|
||
message_type="danger",
|
||
selected_mode=mode,
|
||
)
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
try:
|
||
results = sync_all_playlists(
|
||
local_dir=local_path,
|
||
mode=sync_mode,
|
||
test_folder=TEST_PLAYLIST_DIR,
|
||
)
|
||
merged_count = sum(len(item.merged_paths) for item in results)
|
||
conflict_count = sum(len(item.conflicts) for item in results)
|
||
deleted_count = sum(1 for item in results if item.action == "deleted")
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message="同步完成,输出已写入测试目录用于验证。",
|
||
message_type="success",
|
||
sync_result={
|
||
"mode": sync_mode.value,
|
||
"mode_label": next(
|
||
(item["label"] for item in SYNC_MODE_OPTIONS if item["value"] == sync_mode.value),
|
||
sync_mode.value,
|
||
),
|
||
"merged_count": merged_count,
|
||
"conflict_count": conflict_count,
|
||
"delete_count": deleted_count,
|
||
"playlist_count": len(results),
|
||
"output_dir": TEST_PLAYLIST_DIR,
|
||
},
|
||
selected_mode=sync_mode.value,
|
||
)
|
||
return templates.TemplateResponse("home.html", context)
|
||
except Exception as exc:
|
||
logger.warning(f"Sync failed: {exc}")
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message=f"同步失败:{exc}",
|
||
message_type="danger",
|
||
selected_mode=sync_mode.value,
|
||
)
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
|
||
@app.post("/path-rules", response_class=HTMLResponse)
|
||
async def save_path_rules(
|
||
request: Request,
|
||
local_path: str = Form("playlist"),
|
||
pattern: list[str] | None = Form(None),
|
||
replacement: list[str] | None = Form(None),
|
||
):
|
||
patterns = pattern or []
|
||
replacements = replacement or []
|
||
|
||
cleaned_rules: list[dict[str, str]] = []
|
||
for pat, repl in zip(patterns, replacements):
|
||
pat = pat.strip()
|
||
if not pat:
|
||
continue
|
||
cleaned_rules.append({"pattern": pat, "replacement": repl or ""})
|
||
|
||
server_config.set_and_save_config(path_rules=cleaned_rules)
|
||
|
||
context = _build_home_context(
|
||
request,
|
||
local_path,
|
||
message="正则规则已保存并会在同步前应用。",
|
||
message_type="success",
|
||
)
|
||
|
||
return templates.TemplateResponse("home.html", context)
|
||
|
||
|
||
# 登录页面和处理
|
||
@app.get("/login", response_class=HTMLResponse)
|
||
async def login_page(request: Request):
|
||
server_config.load()
|
||
music_libraries = []
|
||
selected_library = server_config.library_name
|
||
if server_config.url and server_config.token:
|
||
try:
|
||
plex_client.connect(
|
||
token=server_config.token,
|
||
scheme=server_config.scheme,
|
||
url=server_config.url,
|
||
port=server_config.port,
|
||
timeout=server_config.timeout,
|
||
)
|
||
music_libraries = plex_client.get_libs_name_list()
|
||
if music_libraries:
|
||
if selected_library not in music_libraries:
|
||
selected_library = music_libraries[0]
|
||
else:
|
||
selected_library = ""
|
||
except Exception:
|
||
selected_library = ""
|
||
music_libraries = []
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
{
|
||
"request": request,
|
||
"theme": server_config.theme,
|
||
"path": "/login",
|
||
"token": server_config.token,
|
||
"scheme": server_config.scheme,
|
||
"server_url": server_config.url,
|
||
"port": server_config.port,
|
||
"music_libraries": music_libraries,
|
||
"selected_library": selected_library,
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/login", response_class=HTMLResponse)
|
||
async def login(
|
||
request: Request,
|
||
user: str = Form(...),
|
||
pw: str = Form(...),
|
||
token: str = Form(...),
|
||
scheme: str = Form("https"),
|
||
url: str = Form(...),
|
||
port: str = Form("32400"),
|
||
library_name: str = Form(""),
|
||
):
|
||
# 尝试连接到 Plex 服务器
|
||
try:
|
||
# 优先使用 token 连接,如果 token 为空则使用用户名和密码连接
|
||
_, token_success = plex_client.connect(
|
||
username=user,
|
||
password=pw,
|
||
token=token,
|
||
scheme=scheme,
|
||
url=url,
|
||
port=port,
|
||
timeout=server_config.timeout,
|
||
)
|
||
# 成功连接后保存配置到配置文件
|
||
music_libraries: list[str] = []
|
||
selected_library = ""
|
||
if plex_client.connected:
|
||
try:
|
||
music_libraries = plex_client.get_libs_name_list()
|
||
except Exception as exc:
|
||
logger.warning(f"Unable to fetch music libraries: {exc}")
|
||
music_libraries = []
|
||
if music_libraries:
|
||
if library_name and library_name in music_libraries:
|
||
selected_library = library_name
|
||
else:
|
||
selected_library = music_libraries[0]
|
||
server_config.set_and_save_config(
|
||
token=token_success,
|
||
scheme=scheme,
|
||
url=url,
|
||
port=port,
|
||
timeout=server_config.timeout,
|
||
library_name=selected_library,
|
||
)
|
||
else:
|
||
music_libraries = []
|
||
server_config.set_and_save_config(
|
||
token=token_success,
|
||
scheme=scheme,
|
||
url=url,
|
||
port=port,
|
||
timeout=server_config.timeout,
|
||
library_name="",
|
||
)
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
{
|
||
"request": request,
|
||
"message": "连接成功",
|
||
"message_type": "success",
|
||
"theme": server_config.theme,
|
||
"path": "/login",
|
||
"token": server_config.token,
|
||
"scheme": server_config.scheme,
|
||
"server_url": server_config.url,
|
||
"port": server_config.port,
|
||
"music_libraries": music_libraries,
|
||
"selected_library": selected_library,
|
||
},
|
||
)
|
||
except Exception as e:
|
||
music_libraries = []
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
{
|
||
"request": request,
|
||
"message": f"连接失败:{str(e)}",
|
||
"message_type": "danger",
|
||
"theme": server_config.theme,
|
||
"path": "/login",
|
||
"scheme": scheme,
|
||
"server_url": url,
|
||
"port": port,
|
||
"music_libraries": music_libraries,
|
||
"selected_library": "",
|
||
},
|
||
)
|
||
|
||
|
||
@app.get("/playlist", response_class=HTMLResponse)
|
||
async def get_playlist(request: Request):
|
||
return templates.TemplateResponse(
|
||
"playlist.html",
|
||
{"request": request, "theme": server_config.theme, "path": "/playlist"},
|
||
)
|
||
|
||
|
||
@app.post("/playlist", response_class=HTMLResponse)
|
||
async def set_playlist(
|
||
request: Request, address: str = Form(...), interval: str = Form(...)
|
||
):
|
||
# demo:返回提交的设置
|
||
return templates.TemplateResponse(
|
||
"playlist.html",
|
||
{
|
||
"request": request,
|
||
"message": f"设置成功:地址 {address},间隔 {interval} 分钟",
|
||
"message_type": "info",
|
||
"theme": server_config.theme,
|
||
"path": "/playlist",
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/set-theme")
|
||
async def set_theme(theme: str = Form(...)):
|
||
server_config.set_and_save_config(theme=theme)
|
||
return RedirectResponse("/login", status_code=303)
|