Compare commits

..

No commits in common. "main" and "codex/add-token-based-authentication-for-plex-server" have entirely different histories.

11 changed files with 67 additions and 404 deletions

View File

@ -16,7 +16,6 @@ PlexPlaylistSync 是一个用于同步 Plex 播放列表和本地 `.m3u`/`.m3u8`
首次登录时使用用户名和密码连接 Plex 服务器,成功后程序会将获得的 `token` 保存在配置文件中,后续通信仅使用该 `token`,不再保存明文密码。 首次登录时使用用户名和密码连接 Plex 服务器,成功后程序会将获得的 `token` 保存在配置文件中,后续通信仅使用该 `token`,不再保存明文密码。
默认情况下 Plex 服务器使用 `32400` 端口,可在未修改服务器端口时直接使用该默认值。 默认情况下 Plex 服务器使用 `32400` 端口,可在未修改服务器端口时直接使用该默认值。
登录页面提供选择 `http``https` 的下拉框,服务器地址输入框只需填写域名或 IP默认值会从 `config.json` 读取。
## 安装 ## 安装

View File

@ -2,6 +2,5 @@
"theme": "auto", "theme": "auto",
"token": "", "token": "",
"server_url": "", "server_url": "",
"server_port": "", "server_port": "32400"
"server_scheme": ""
} }

View File

@ -1,100 +1,52 @@
import os import os
from app.utils.config import server_config from app.utils.config import load_config, save_config
from fastapi import FastAPI, Request, Form from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from app.utils.plex_client import plex_client from app.utils.plex_client import connect_plex
from app.utils.logger import logger
app = FastAPI() app = FastAPI()
templates = Jinja2Templates( templates = Jinja2Templates(directory=os.path.join(os.path.dirname(__file__), "templates"))
directory=os.path.join(os.path.dirname(__file__), "templates")
)
# mount static files # mount static files
# 这里的路径是相对于 main.py 文件所在的目录 # 这里的路径是相对于 main.py 文件所在的目录
app.mount( app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static")
"/static",
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
name="static",
)
# 显示主页
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
async def home(request: Request): async def home(request: Request):
server_config.load() config = load_config()
return templates.TemplateResponse( theme = config.get("theme", "auto")
"login.html", return templates.TemplateResponse("login.html", {"request": request, "theme": theme, "path": "/login"})
{
"request": request,
"theme": server_config.theme,
"path": "/login",
"scheme": server_config.scheme,
"token": server_config.token,
"server_url": server_config.url,
"port": server_config.port,
},
)
# 登录页面和处理
@app.get("/login", response_class=HTMLResponse) @app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request): async def login_page(request: Request):
server_config.load() config = load_config()
return templates.TemplateResponse( theme = config.get("theme", "auto")
"login.html", return templates.TemplateResponse("login.html", {"request": request, "theme": theme, "path": "/login"})
{
"request": request,
"theme": server_config.theme,
"path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
},
)
@app.post("/login", response_class=HTMLResponse) @app.post("/login", response_class=HTMLResponse)
async def login( async def login(
request: Request, request: Request,
user: str = Form(...), user: str = Form(...),
pw: str = Form(...), pw: str = Form(...),
token: str = Form(...),
scheme: str = Form("https"),
url: str = Form(...), url: str = Form(...),
port: str = Form("32400"), port: str = Form("32400")
): ):
# 尝试连接到 Plex 服务器 config = load_config()
theme = config.get("theme", "auto")
try: try:
# 优先使用 token 连接,如果 token 为空则使用用户名和密码连接 connect_plex(config, user, pw, url, port)
_, token_success = plex_client.connect( save_config(config)
username=user,
password=pw,
token=token,
scheme=scheme,
url=url,
port=port,
)
# 成功连接后保存配置到配置文件
if plex_client.connected:
server_config.set_and_save_config(
token=token_success, scheme=scheme, url=url, port=port
)
return templates.TemplateResponse( return templates.TemplateResponse(
"login.html", "login.html",
{ {
"request": request, "request": request,
"message": "连接成功", "message": "连接成功",
"message_type": "success", "success": True,
"theme": server_config.theme, "theme": theme,
"path": "/login", "path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
}, },
) )
except Exception as e: except Exception as e:
@ -103,42 +55,33 @@ async def login(
{ {
"request": request, "request": request,
"message": f"连接失败:{str(e)}", "message": f"连接失败:{str(e)}",
"message_type": "danger", "success": False,
"theme": server_config.theme, "theme": theme,
"path": "/login", "path": "/login",
"scheme": scheme,
"server_url": url,
"port": port,
}, },
) )
@app.get("/playlist", response_class=HTMLResponse) @app.get("/playlist", response_class=HTMLResponse)
async def get_playlist(request: Request): async def get_playlist(request: Request):
return templates.TemplateResponse( config = load_config()
"playlist.html", theme = config.get("theme", "auto")
{"request": request, "theme": server_config.theme, "path": "/playlist"}, return templates.TemplateResponse("playlist.html", {"request": request, "theme": theme, "path": "/playlist"})
)
@app.post("/playlist", response_class=HTMLResponse) @app.post("/playlist", response_class=HTMLResponse)
async def set_playlist( async def set_playlist(request: Request, address: str = Form(...), interval: str = Form(...)):
request: Request, address: str = Form(...), interval: str = Form(...) config = load_config()
): theme = config.get("theme", "auto")
# demo返回提交的设置 # demo返回提交的设置
return templates.TemplateResponse( return templates.TemplateResponse("playlist.html", {
"playlist.html", "request": request,
{ "message": f"设置成功:地址 {address},间隔 {interval} 分钟",
"request": request, "theme": theme,
"message": f"设置成功:地址 {address},间隔 {interval} 分钟", "path": "/playlist"
"message_type": "info", })
"theme": server_config.theme,
"path": "/playlist",
},
)
@app.post("/set-theme") @app.post("/set-theme")
async def set_theme(theme: str = Form(...)): async def set_theme(theme: str = Form(...)):
server_config.set_and_save_config(theme=theme) config = load_config()
return RedirectResponse("/login", status_code=303) config["theme"] = theme
save_config(config)
return RedirectResponse("/login", status_code=303)

View File

@ -1,20 +1,3 @@
.custom-sidebar { .custom-sidebar {
max-width: 300px !important; /* 设置最大宽度为 300px */ max-width: 300px !important; /* 设置最大宽度为 300px */
} }
/* Toast with frosted glass effect */
.glass-toast {
backdrop-filter: blur(5px);
}
.glass-toast-success {
background-color: rgba(var(--bs-success-rgb), 0.75) !important;
}
.glass-toast-danger {
background-color: rgba(var(--bs-danger-rgb), 0.75) !important;
}
.glass-toast-info {
background-color: rgba(var(--bs-info-rgb), 0.75) !important;
}

View File

@ -87,18 +87,6 @@
<main class="col ms-sm-auto px-md-4 py-4"> <main class="col ms-sm-auto px-md-4 py-4">
{% block content %}{% endblock %} {% block content %}{% endblock %}
</main> </main>
{% if message %}
<div class="position-fixed top-0 start-50 translate-middle-x p-3 w-75" style="z-index: 1055; margin-top: 20px;">
<div id="messageToast" class="toast text-bg-{{ message_type | default('info') }} border-0 glass-toast glass-toast-{{ message_type | default('info') }} w-100" role="alert" aria-live="assertive" aria-atomic="true">
<div class="d-flex">
<div class="toast-body">
{{ message }}
</div>
<button type="button" class="btn-close btn-close-white me-2 m-auto" data-bs-dismiss="toast" aria-label="Close"></button>
</div>
</div>
</div>
{% endif %}
</div> </div>
</div> </div>
@ -118,14 +106,6 @@
// listen for system theme changes // listen for system theme changes
window.matchMedia('(prefers-color-scheme: dark)').addEventListener('change', applyAutoTheme); window.matchMedia('(prefers-color-scheme: dark)').addEventListener('change', applyAutoTheme);
} }
// show toast message
document.addEventListener('DOMContentLoaded', () => {
const toastEl = document.getElementById('messageToast');
if (toastEl) {
const toast = new bootstrap.Toast(toastEl, { delay: 3000 });
toast.show();
}
});
</script> </script>
</body> </body>

View File

@ -6,60 +6,25 @@
<h2>🔐 登录信息</h2> <h2>🔐 登录信息</h2>
<form method="post" action="/login"> <form method="post" action="/login">
<div class="mb-3"> <div class="mb-3">
<label for="user" class="form-label" id="user-label">用户名</label> <label for="user" class="form-label">用户名</label>
<input type="text" class="form-control" id="user" name="user"> <input type="text" class="form-control" id="user" name="user">
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="pw" class="form-label" id="pw-label">密码</label> <label for="pw" class="form-label">密码</label>
<input type="password" class="form-control" id="pw" name="pw"> <input type="password" class="form-control" id="pw" name="pw">
</div>
<div class="mb-3">
<label for="token" class="form-label">Token</label>
<input type="text" class="form-control" id="token" name="token" value="{{ token }}">
</div>
<div class="mb-3">
<label for="scheme" class="form-label">协议</label>
<select class="form-select" id="scheme" name="scheme">
<option value="http" {% if scheme == 'http' %}selected{% endif %}>http</option>
<option value="https" {% if scheme == 'https' %}selected{% endif %}>https</option>
</select>
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="url" class="form-label">服务器地址</label> <label for="url" class="form-label">服务器地址</label>
<input type="text" class="form-control" id="url" name="url" placeholder="127.0.0.1 或 plex.sample.com" value="{{ server_url }}"> <input type="text" class="form-control" id="url" name="url" placeholder="127.0.0.1">
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="port" class="form-label">端口</label> <label for="port" class="form-label">端口</label>
<input type="text" class="form-control" id="port" name="port" value="{{ port }}"> <input type="text" class="form-control" id="port" name="port" value="32400">
</div> </div>
<button type="submit" class="btn btn-primary">连接</button> <button type="submit" class="btn btn-primary">连接</button>
</form> </form>
<script> {% if message %}
document.addEventListener('DOMContentLoaded', () => { <div class="alert alert-{{ 'success' if success else 'danger' }} mt-4">{{ message }}</div>
const tokenInput = document.getElementById('token'); {% endif %}
const inputs = [document.getElementById('user'), document.getElementById('pw')]; {% endblock %}
const labels = [document.getElementById('user-label'), document.getElementById('pw-label')];
const fieldClasses = ['bg-body-secondary', 'text-body-secondary', 'text-decoration-line-through'];
function toggleCredFields() {
const inactive = tokenInput.value.trim() !== '';
inputs.forEach(el => {
el.readOnly = inactive;
el.tabIndex = inactive ? -1 : 0;
el.style.pointerEvents = inactive ? 'none' : '';
if (inactive && document.activeElement === el) {
el.blur();
}
fieldClasses.forEach(cls => el.classList.toggle(cls, inactive));
});
labels.forEach(el => {
['text-decoration-line-through', 'text-body-secondary'].forEach(cls => el.classList.toggle(cls, inactive));
});
}
tokenInput.addEventListener('input', toggleCredFields);
toggleCredFields();
});
</script>
{% endblock %}

View File

@ -20,4 +20,7 @@
<button type="submit" class="btn btn-success">保存设置</button> <button type="submit" class="btn btn-success">保存设置</button>
</form> </form>
{% if message %}
<div class="alert alert-info mt-4">{{ message }}</div>
{% endif %}
{% endblock %} {% endblock %}

View File

@ -1,6 +0,0 @@
def str_is_empty(s: str) -> bool:
"""Check if a string is empty or contains only whitespace."""
if s is None:
return True
stripped = s.replace(" ", " ")
return not bool(stripped)

View File

@ -1,94 +1,14 @@
import json import json
import os import os
from app.utils.logger import logger
CONFIG_PATH = os.path.abspath( CONFIG_PATH = os.path.join(os.path.dirname(__file__), "..", "config.json")
os.path.join(os.path.dirname(__file__), "..", "config.json")
)
def load_config():
if not os.path.exists(CONFIG_PATH):
return {}
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
class ServerConfig: def save_config(new_config):
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
def __init__(self): json.dump(new_config, f, indent=4, ensure_ascii=False)
self.theme = "auto"
self.token = ""
self.url = ""
self.scheme = "https"
self.port = "32400"
self.load()
def load(self) -> None:
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
logger.debug(f"Loaded server config: {config}")
except FileNotFoundError:
# 如果配置文件不存在,使用默认值
self.save()
logger.debug("Config file not found, using default values.")
return
except json.JSONDecodeError:
# 如果配置文件格式错误,使用默认值
self.save()
logger.debug("Config file is invalid, using default values.")
return
self.theme = config.get("theme", "auto")
self.token = config.get("token", "")
self.url = config.get("server_url", "")
self.scheme = config.get("server_scheme", "https")
self.port = config.get("server_port", "32400")
logger.info(f"Server config loaded: {self.__dict__}")
def save(self):
config = {
"theme": self.theme,
"token": self.token,
"server_url": self.url,
"server_scheme": self.scheme,
"server_port": self.port,
}
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
logger.info(f"Server config saved: {config}")
def set_url(self, url: str) -> None:
self.url = url
def set_scheme(self, scheme: str) -> None:
self.scheme = scheme
def set_port(self, port: str) -> None:
self.port = port
def set_token(self, token: str) -> None:
self.token = token
def set_theme(self, theme: str) -> None:
# check theme is valid
if theme not in ["auto", "dark", "light"]:
logger.error(f"Invalid theme: {theme}")
raise ValueError("Invalid theme. Must be 'auto', 'dark', or 'light'.")
self.theme = theme
def set_and_save_config(
self,
theme: str = None,
token: str = None,
url: str = None,
scheme: str = None,
port: str = None,
) -> None:
if theme is not None:
self.set_theme(theme)
if token is not None:
self.set_token(token)
if url is not None:
self.set_url(url)
if scheme is not None:
self.set_scheme(scheme)
if port is not None:
self.set_port(port)
self.save()
server_config = ServerConfig()

View File

@ -1,33 +0,0 @@
import logging
import os
LOG_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "logs", "app.log"))
LOG_LEVEL = logging.DEBUG
def logger_initialize() -> logging.Logger:
"""Initialize the logger for the application. Return a logger that logs to console and a app.log."""
logger = logging.getLogger("PlexPlaylistSync")
if logger.hasHandlers():
return logger
# check log path exists, if not create it
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
logger.setLevel(LOG_LEVEL)
# 2025-07-19 17:23:05,116 [levelname]: message
formatter = logging.Formatter("%(asctime)s [%(levelname)s]: %(message)s")
file_handler = logging.FileHandler(LOG_PATH, encoding="utf-8")
file_handler.setFormatter(formatter)
# add file handler
logger.addHandler(file_handler)
# console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = logger_initialize()

View File

@ -1,106 +1,16 @@
from plexapi.myplex import MyPlexAccount from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer from plexapi.server import PlexServer
from urllib.parse import urlparse
from app.utils.common import str_is_empty
from app.utils.logger import logger
def build_plex_url(scheme, url, port="32400"): def connect_plex(config, username, password, url, port="32400"):
"""Build a full Plex URL from scheme, url, and port.""" """Return a connected PlexServer instance and update config with token and server info."""
# 如果url不以http://或https://开头则添加scheme token = config.get("token")
full_url = url if not token:
if not full_url.startswith("http://") and not full_url.startswith("https://"):
full_url = f"{scheme}://{url}"
parsed = urlparse(full_url)
if parsed.scheme in ("http", "https"):
netloc = parsed.netloc or parsed.path
if ":" not in netloc and port:
netloc = f"{netloc}:{port}"
base_url = f"{parsed.scheme}://{netloc}"
else:
base_url = f"http://{url}:{port}"
return base_url
class PlexClient:
"""A client for interacting with a Plex server."""
def __init__(self) -> None:
self.server: PlexServer | None = None
self.token: str | None = None
self.base_url: str | None = None
self.connected = False
def connect(
self,
username: str = "",
password: str = "",
token: str = "",
scheme: str = "https",
url: str = "",
port: str = "32400",
):
"""Connect to the Plex server using username/password or token."""
# Connect Server with token if token is provided, otherwise use username/password
try:
if not str_is_empty(token):
self.server, self.token = self._connect_with_token(token, scheme, url, port)
else:
self.server, self.token = self._connect_with_pw(
username, password, scheme, url, port
)
# Update the base URL and connection status
self.base_url = build_plex_url(scheme, url, port)
self.connected = True
logger.info(f"Connected to Plex server at {self.base_url} with token: {self.token}")
return self.server, self.token
except Exception as e:
logger.warning(f"Failed to connect to Plex server: {str(e)}")
self.connected = False
raise
def _connect_with_pw(self, username: str, password: str, scheme: str, url: str, port: str = "32400"):
"""Return a connected PlexServer instance and update config with token and server info."""
# url 初始化
self.base_url = build_plex_url(scheme, url, port)
# account 初始化
account = MyPlexAccount(username, password) account = MyPlexAccount(username, password)
# token 获取 token = account.authenticationToken
self.token = account.authenticationToken config["token"] = token
self.server = PlexServer(self.base_url, self.token) base_url = f"http://{url}:{port}"
logger.debug(f"Connected to Plex server with username: {username}, token: {self.token}") server = PlexServer(base_url, token)
return self.server, self.token config.update({"server_url": url, "server_port": port})
return server
def _connect_with_token(self, token: str, scheme: str, url: str, port: str = "32400"):
"""Return a connected PlexServer instance using a token."""
# URL 初始化
self.base_url = build_plex_url(scheme, url, port)
self.server = PlexServer(self.base_url, token)
logger.debug(f"Connected to Plex server with token: {token}")
return self.server, token
def get_server(self) -> PlexServer | None:
"""Return the connected Plex server instance."""
if not self.connected:
logger.error("Plex client is not connected.")
raise RuntimeError("Plex client is not connected.")
return self.server
def get_all_playlist(self) -> list | None:
"""Return all playlists from the Plex server."""
if not self.connected or not self.server:
logger.error("Plex server is not connected.")
raise ValueError("Plex server is not connected.")
try:
playlists = self.server.playlists()
logger.info(f"Fetched {len(playlists)} playlists from the server.")
return playlists
except Exception as e:
logger.warning(f"Failed to fetch playlists: {str(e)}")
raise RuntimeError(f"Failed to fetch playlists: {str(e)}")
plex_client = PlexClient()