Compare commits

..

19 Commits

Author SHA1 Message Date
1eb067bab7 logger fix 2025-07-19 18:37:23 +09:00
c3d1662465 logger added.
plex_client method naming fix.
2025-07-19 18:00:48 +09:00
65bd99d3f2 Fix config not save&load 2025-07-19 10:35:14 +09:00
Koha9
ec2673a5c8
Merge pull request #8 from Koha9/codex/add-notification-popup-with-bootstrap
Add bootstrap toast for messages
2025-07-13 07:25:09 +09:00
82f18dea9b adjust backdrop blur for glass toast effect. 2025-07-13 07:24:50 +09:00
Koha9
43a9c01f13 Tint toast glass effect with message color 2025-07-13 07:18:14 +09:00
Koha9
bf67d702dc Center toast and add glass style 2025-07-13 07:07:41 +09:00
Koha9
969b32ab68 feat(ui): add bootstrap toast for messages 2025-07-13 06:56:26 +09:00
992161f9a9 refactor plex_client,config 2025-07-12 19:00:35 +09:00
Koha9
35bd0c9956
Merge pull request #6 from Koha9/token-connect
Token connect support
2025-07-08 19:59:02 +09:00
Koha9
4003fd5bc1
Merge pull request #5 from Koha9/codex/add-grayout-and-strikethrough-for-user-and-password
UI tweak for token login
2025-07-08 19:56:41 +09:00
Koha9
afd4981ba7 refactor login template 2025-07-08 19:54:32 +09:00
Koha9
77c38cd17d Disable highlight when user/pass grayed out 2025-07-08 19:49:35 +09:00
Koha9
0c3d9d7287 Grey out user/password when token present 2025-07-08 19:44:07 +09:00
a8863f911b 支持手动token认证 2025-07-08 19:17:18 +09:00
Koha9
0c2a84f944
Merge pull request #4 from Koha9/codex/support-https-urls-for-server-address
Support HTTPS server URLs
2025-06-21 19:42:13 +09:00
Koha9
d4ed9d22f5 Add scheme dropdown and load defaults from config 2025-06-21 19:34:07 +09:00
Koha9
7ab55e3ae4 Add HTTPS support for server URL 2025-06-21 19:20:48 +09:00
Koha9
8fa0189a1d
Merge pull request #2 from Koha9/codex/add-token-based-authentication-for-plex-server
Implement Plex token login
2025-06-20 22:36:54 +09:00
11 changed files with 404 additions and 67 deletions

View File

@ -16,6 +16,7 @@ PlexPlaylistSync 是一个用于同步 Plex 播放列表和本地 `.m3u`/`.m3u8`
首次登录时使用用户名和密码连接 Plex 服务器,成功后程序会将获得的 `token` 保存在配置文件中,后续通信仅使用该 `token`,不再保存明文密码。 首次登录时使用用户名和密码连接 Plex 服务器,成功后程序会将获得的 `token` 保存在配置文件中,后续通信仅使用该 `token`,不再保存明文密码。
默认情况下 Plex 服务器使用 `32400` 端口,可在未修改服务器端口时直接使用该默认值。 默认情况下 Plex 服务器使用 `32400` 端口,可在未修改服务器端口时直接使用该默认值。
登录页面提供选择 `http``https` 的下拉框,服务器地址输入框只需填写域名或 IP默认值会从 `config.json` 读取。
## 安装 ## 安装

View File

@ -2,5 +2,6 @@
"theme": "auto", "theme": "auto",
"token": "", "token": "",
"server_url": "", "server_url": "",
"server_port": "32400" "server_port": "",
"server_scheme": ""
} }

View File

@ -1,52 +1,100 @@
import os import os
from app.utils.config import load_config, save_config from app.utils.config import server_config
from fastapi import FastAPI, Request, Form from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from app.utils.plex_client import connect_plex from app.utils.plex_client import plex_client
from app.utils.logger import logger
app = FastAPI() app = FastAPI()
templates = Jinja2Templates(directory=os.path.join(os.path.dirname(__file__), "templates")) templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
# mount static files # mount static files
# 这里的路径是相对于 main.py 文件所在的目录 # 这里的路径是相对于 main.py 文件所在的目录
app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static") app.mount(
"/static",
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
name="static",
)
# 显示主页
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
async def home(request: Request): async def home(request: Request):
config = load_config() server_config.load()
theme = config.get("theme", "auto") return templates.TemplateResponse(
return templates.TemplateResponse("login.html", {"request": request, "theme": theme, "path": "/login"}) "login.html",
{
"request": request,
"theme": server_config.theme,
"path": "/login",
"scheme": server_config.scheme,
"token": server_config.token,
"server_url": server_config.url,
"port": server_config.port,
},
)
# 登录页面和处理
@app.get("/login", response_class=HTMLResponse) @app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request): async def login_page(request: Request):
config = load_config() server_config.load()
theme = config.get("theme", "auto") return templates.TemplateResponse(
return templates.TemplateResponse("login.html", {"request": request, "theme": theme, "path": "/login"}) "login.html",
{
"request": request,
"theme": server_config.theme,
"path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
},
)
@app.post("/login", response_class=HTMLResponse) @app.post("/login", response_class=HTMLResponse)
async def login( async def login(
request: Request, request: Request,
user: str = Form(...), user: str = Form(...),
pw: str = Form(...), pw: str = Form(...),
token: str = Form(...),
scheme: str = Form("https"),
url: str = Form(...), url: str = Form(...),
port: str = Form("32400") port: str = Form("32400"),
): ):
config = load_config() # 尝试连接到 Plex 服务器
theme = config.get("theme", "auto")
try: try:
connect_plex(config, user, pw, url, port) # 优先使用 token 连接,如果 token 为空则使用用户名和密码连接
save_config(config) _, token_success = plex_client.connect(
username=user,
password=pw,
token=token,
scheme=scheme,
url=url,
port=port,
)
# 成功连接后保存配置到配置文件
if plex_client.connected:
server_config.set_and_save_config(
token=token_success, scheme=scheme, url=url, port=port
)
return templates.TemplateResponse( return templates.TemplateResponse(
"login.html", "login.html",
{ {
"request": request, "request": request,
"message": "连接成功", "message": "连接成功",
"success": True, "message_type": "success",
"theme": theme, "theme": server_config.theme,
"path": "/login", "path": "/login",
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
}, },
) )
except Exception as e: except Exception as e:
@ -55,33 +103,42 @@ async def login(
{ {
"request": request, "request": request,
"message": f"连接失败:{str(e)}", "message": f"连接失败:{str(e)}",
"success": False, "message_type": "danger",
"theme": theme, "theme": server_config.theme,
"path": "/login", "path": "/login",
"scheme": scheme,
"server_url": url,
"port": port,
}, },
) )
@app.get("/playlist", response_class=HTMLResponse) @app.get("/playlist", response_class=HTMLResponse)
async def get_playlist(request: Request): async def get_playlist(request: Request):
config = load_config() return templates.TemplateResponse(
theme = config.get("theme", "auto") "playlist.html",
return templates.TemplateResponse("playlist.html", {"request": request, "theme": theme, "path": "/playlist"}) {"request": request, "theme": server_config.theme, "path": "/playlist"},
)
@app.post("/playlist", response_class=HTMLResponse) @app.post("/playlist", response_class=HTMLResponse)
async def set_playlist(request: Request, address: str = Form(...), interval: str = Form(...)): async def set_playlist(
config = load_config() request: Request, address: str = Form(...), interval: str = Form(...)
theme = config.get("theme", "auto") ):
# demo返回提交的设置 # demo返回提交的设置
return templates.TemplateResponse("playlist.html", { return templates.TemplateResponse(
"playlist.html",
{
"request": request, "request": request,
"message": f"设置成功:地址 {address},间隔 {interval} 分钟", "message": f"设置成功:地址 {address},间隔 {interval} 分钟",
"theme": theme, "message_type": "info",
"path": "/playlist" "theme": server_config.theme,
}) "path": "/playlist",
},
)
@app.post("/set-theme") @app.post("/set-theme")
async def set_theme(theme: str = Form(...)): async def set_theme(theme: str = Form(...)):
config = load_config() server_config.set_and_save_config(theme=theme)
config["theme"] = theme
save_config(config)
return RedirectResponse("/login", status_code=303) return RedirectResponse("/login", status_code=303)

View File

@ -1,3 +1,20 @@
.custom-sidebar { .custom-sidebar {
max-width: 300px !important; /* 设置最大宽度为 300px */ max-width: 300px !important; /* 设置最大宽度为 300px */
} }
/* Toast with frosted glass effect */
.glass-toast {
backdrop-filter: blur(5px);
}
.glass-toast-success {
background-color: rgba(var(--bs-success-rgb), 0.75) !important;
}
.glass-toast-danger {
background-color: rgba(var(--bs-danger-rgb), 0.75) !important;
}
.glass-toast-info {
background-color: rgba(var(--bs-info-rgb), 0.75) !important;
}

View File

@ -87,6 +87,18 @@
<main class="col ms-sm-auto px-md-4 py-4"> <main class="col ms-sm-auto px-md-4 py-4">
{% block content %}{% endblock %} {% block content %}{% endblock %}
</main> </main>
{% if message %}
<div class="position-fixed top-0 start-50 translate-middle-x p-3 w-75" style="z-index: 1055; margin-top: 20px;">
<div id="messageToast" class="toast text-bg-{{ message_type | default('info') }} border-0 glass-toast glass-toast-{{ message_type | default('info') }} w-100" role="alert" aria-live="assertive" aria-atomic="true">
<div class="d-flex">
<div class="toast-body">
{{ message }}
</div>
<button type="button" class="btn-close btn-close-white me-2 m-auto" data-bs-dismiss="toast" aria-label="Close"></button>
</div>
</div>
</div>
{% endif %}
</div> </div>
</div> </div>
@ -106,6 +118,14 @@
// listen for system theme changes // listen for system theme changes
window.matchMedia('(prefers-color-scheme: dark)').addEventListener('change', applyAutoTheme); window.matchMedia('(prefers-color-scheme: dark)').addEventListener('change', applyAutoTheme);
} }
// show toast message
document.addEventListener('DOMContentLoaded', () => {
const toastEl = document.getElementById('messageToast');
if (toastEl) {
const toast = new bootstrap.Toast(toastEl, { delay: 3000 });
toast.show();
}
});
</script> </script>
</body> </body>

View File

@ -6,25 +6,60 @@
<h2>🔐 登录信息</h2> <h2>🔐 登录信息</h2>
<form method="post" action="/login"> <form method="post" action="/login">
<div class="mb-3"> <div class="mb-3">
<label for="user" class="form-label">用户名</label> <label for="user" class="form-label" id="user-label">用户名</label>
<input type="text" class="form-control" id="user" name="user"> <input type="text" class="form-control" id="user" name="user">
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="pw" class="form-label">密码</label> <label for="pw" class="form-label" id="pw-label">密码</label>
<input type="password" class="form-control" id="pw" name="pw"> <input type="password" class="form-control" id="pw" name="pw">
</div>
<div class="mb-3">
<label for="token" class="form-label">Token</label>
<input type="text" class="form-control" id="token" name="token" value="{{ token }}">
</div>
<div class="mb-3">
<label for="scheme" class="form-label">协议</label>
<select class="form-select" id="scheme" name="scheme">
<option value="http" {% if scheme == 'http' %}selected{% endif %}>http</option>
<option value="https" {% if scheme == 'https' %}selected{% endif %}>https</option>
</select>
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="url" class="form-label">服务器地址</label> <label for="url" class="form-label">服务器地址</label>
<input type="text" class="form-control" id="url" name="url" placeholder="127.0.0.1"> <input type="text" class="form-control" id="url" name="url" placeholder="127.0.0.1 或 plex.sample.com" value="{{ server_url }}">
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="port" class="form-label">端口</label> <label for="port" class="form-label">端口</label>
<input type="text" class="form-control" id="port" name="port" value="32400"> <input type="text" class="form-control" id="port" name="port" value="{{ port }}">
</div> </div>
<button type="submit" class="btn btn-primary">连接</button> <button type="submit" class="btn btn-primary">连接</button>
</form> </form>
{% if message %} <script>
<div class="alert alert-{{ 'success' if success else 'danger' }} mt-4">{{ message }}</div> document.addEventListener('DOMContentLoaded', () => {
{% endif %} const tokenInput = document.getElementById('token');
const inputs = [document.getElementById('user'), document.getElementById('pw')];
const labels = [document.getElementById('user-label'), document.getElementById('pw-label')];
const fieldClasses = ['bg-body-secondary', 'text-body-secondary', 'text-decoration-line-through'];
function toggleCredFields() {
const inactive = tokenInput.value.trim() !== '';
inputs.forEach(el => {
el.readOnly = inactive;
el.tabIndex = inactive ? -1 : 0;
el.style.pointerEvents = inactive ? 'none' : '';
if (inactive && document.activeElement === el) {
el.blur();
}
fieldClasses.forEach(cls => el.classList.toggle(cls, inactive));
});
labels.forEach(el => {
['text-decoration-line-through', 'text-body-secondary'].forEach(cls => el.classList.toggle(cls, inactive));
});
}
tokenInput.addEventListener('input', toggleCredFields);
toggleCredFields();
});
</script>
{% endblock %} {% endblock %}

View File

@ -20,7 +20,4 @@
<button type="submit" class="btn btn-success">保存设置</button> <button type="submit" class="btn btn-success">保存设置</button>
</form> </form>
{% if message %}
<div class="alert alert-info mt-4">{{ message }}</div>
{% endif %}
{% endblock %} {% endblock %}

6
app/utils/common.py Normal file
View File

@ -0,0 +1,6 @@
def str_is_empty(s: str) -> bool:
"""Check if a string is empty or contains only whitespace."""
if s is None:
return True
stripped = s.replace(" ", " ")
return not bool(stripped)

View File

@ -1,14 +1,94 @@
import json import json
import os import os
from app.utils.logger import logger
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "..", "config.json") CONFIG_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "config.json")
)
def load_config():
if not os.path.exists(CONFIG_PATH): class ServerConfig:
return {}
def __init__(self):
self.theme = "auto"
self.token = ""
self.url = ""
self.scheme = "https"
self.port = "32400"
self.load()
def load(self) -> None:
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f: with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f) config = json.load(f)
logger.debug(f"Loaded server config: {config}")
except FileNotFoundError:
# 如果配置文件不存在,使用默认值
self.save()
logger.debug("Config file not found, using default values.")
return
except json.JSONDecodeError:
# 如果配置文件格式错误,使用默认值
self.save()
logger.debug("Config file is invalid, using default values.")
return
self.theme = config.get("theme", "auto")
self.token = config.get("token", "")
self.url = config.get("server_url", "")
self.scheme = config.get("server_scheme", "https")
self.port = config.get("server_port", "32400")
logger.info(f"Server config loaded: {self.__dict__}")
def save_config(new_config): def save(self):
config = {
"theme": self.theme,
"token": self.token,
"server_url": self.url,
"server_scheme": self.scheme,
"server_port": self.port,
}
with open(CONFIG_PATH, "w", encoding="utf-8") as f: with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(new_config, f, indent=4, ensure_ascii=False) json.dump(config, f, indent=4, ensure_ascii=False)
logger.info(f"Server config saved: {config}")
def set_url(self, url: str) -> None:
self.url = url
def set_scheme(self, scheme: str) -> None:
self.scheme = scheme
def set_port(self, port: str) -> None:
self.port = port
def set_token(self, token: str) -> None:
self.token = token
def set_theme(self, theme: str) -> None:
# check theme is valid
if theme not in ["auto", "dark", "light"]:
logger.error(f"Invalid theme: {theme}")
raise ValueError("Invalid theme. Must be 'auto', 'dark', or 'light'.")
self.theme = theme
def set_and_save_config(
self,
theme: str = None,
token: str = None,
url: str = None,
scheme: str = None,
port: str = None,
) -> None:
if theme is not None:
self.set_theme(theme)
if token is not None:
self.set_token(token)
if url is not None:
self.set_url(url)
if scheme is not None:
self.set_scheme(scheme)
if port is not None:
self.set_port(port)
self.save()
server_config = ServerConfig()

33
app/utils/logger.py Normal file
View File

@ -0,0 +1,33 @@
import logging
import os
LOG_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "logs", "app.log"))
LOG_LEVEL = logging.DEBUG
def logger_initialize() -> logging.Logger:
"""Initialize the logger for the application. Return a logger that logs to console and a app.log."""
logger = logging.getLogger("PlexPlaylistSync")
if logger.hasHandlers():
return logger
# check log path exists, if not create it
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
logger.setLevel(LOG_LEVEL)
# 2025-07-19 17:23:05,116 [levelname]: message
formatter = logging.Formatter("%(asctime)s [%(levelname)s]: %(message)s")
file_handler = logging.FileHandler(LOG_PATH, encoding="utf-8")
file_handler.setFormatter(formatter)
# add file handler
logger.addHandler(file_handler)
# console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = logger_initialize()

View File

@ -1,16 +1,106 @@
from plexapi.myplex import MyPlexAccount from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer from plexapi.server import PlexServer
from urllib.parse import urlparse
from app.utils.common import str_is_empty
from app.utils.logger import logger
def connect_plex(config, username, password, url, port="32400"): def build_plex_url(scheme, url, port="32400"):
"""Return a connected PlexServer instance and update config with token and server info.""" """Build a full Plex URL from scheme, url, and port."""
token = config.get("token") # 如果url不以http://或https://开头则添加scheme
if not token: full_url = url
account = MyPlexAccount(username, password) if not full_url.startswith("http://") and not full_url.startswith("https://"):
token = account.authenticationToken full_url = f"{scheme}://{url}"
config["token"] = token parsed = urlparse(full_url)
if parsed.scheme in ("http", "https"):
netloc = parsed.netloc or parsed.path
if ":" not in netloc and port:
netloc = f"{netloc}:{port}"
base_url = f"{parsed.scheme}://{netloc}"
else:
base_url = f"http://{url}:{port}" base_url = f"http://{url}:{port}"
server = PlexServer(base_url, token) return base_url
config.update({"server_url": url, "server_port": port})
return server
class PlexClient:
"""A client for interacting with a Plex server."""
def __init__(self) -> None:
self.server: PlexServer | None = None
self.token: str | None = None
self.base_url: str | None = None
self.connected = False
def connect(
self,
username: str = "",
password: str = "",
token: str = "",
scheme: str = "https",
url: str = "",
port: str = "32400",
):
"""Connect to the Plex server using username/password or token."""
# Connect Server with token if token is provided, otherwise use username/password
try:
if not str_is_empty(token):
self.server, self.token = self._connect_with_token(token, scheme, url, port)
else:
self.server, self.token = self._connect_with_pw(
username, password, scheme, url, port
)
# Update the base URL and connection status
self.base_url = build_plex_url(scheme, url, port)
self.connected = True
logger.info(f"Connected to Plex server at {self.base_url} with token: {self.token}")
return self.server, self.token
except Exception as e:
logger.warning(f"Failed to connect to Plex server: {str(e)}")
self.connected = False
raise
def _connect_with_pw(self, username: str, password: str, scheme: str, url: str, port: str = "32400"):
"""Return a connected PlexServer instance and update config with token and server info."""
# url 初始化
self.base_url = build_plex_url(scheme, url, port)
# account 初始化
account = MyPlexAccount(username, password)
# token 获取
self.token = account.authenticationToken
self.server = PlexServer(self.base_url, self.token)
logger.debug(f"Connected to Plex server with username: {username}, token: {self.token}")
return self.server, self.token
def _connect_with_token(self, token: str, scheme: str, url: str, port: str = "32400"):
"""Return a connected PlexServer instance using a token."""
# URL 初始化
self.base_url = build_plex_url(scheme, url, port)
self.server = PlexServer(self.base_url, token)
logger.debug(f"Connected to Plex server with token: {token}")
return self.server, token
def get_server(self) -> PlexServer | None:
"""Return the connected Plex server instance."""
if not self.connected:
logger.error("Plex client is not connected.")
raise RuntimeError("Plex client is not connected.")
return self.server
def get_all_playlist(self) -> list | None:
"""Return all playlists from the Plex server."""
if not self.connected or not self.server:
logger.error("Plex server is not connected.")
raise ValueError("Plex server is not connected.")
try:
playlists = self.server.playlists()
logger.info(f"Fetched {len(playlists)} playlists from the server.")
return playlists
except Exception as e:
logger.warning(f"Failed to fetch playlists: {str(e)}")
raise RuntimeError(f"Failed to fetch playlists: {str(e)}")
plex_client = PlexClient()