Compare commits

...

11 Commits

Author SHA1 Message Date
1eb067bab7 logger fix 2025-07-19 18:37:23 +09:00
c3d1662465 logger added.
plex_client method naming fix.
2025-07-19 18:00:48 +09:00
65bd99d3f2 Fix config not save&load 2025-07-19 10:35:14 +09:00
Koha9
ec2673a5c8
Merge pull request #8 from Koha9/codex/add-notification-popup-with-bootstrap
Add bootstrap toast for messages
2025-07-13 07:25:09 +09:00
82f18dea9b adjust backdrop blur for glass toast effect. 2025-07-13 07:24:50 +09:00
Koha9
43a9c01f13 Tint toast glass effect with message color 2025-07-13 07:18:14 +09:00
Koha9
bf67d702dc Center toast and add glass style 2025-07-13 07:07:41 +09:00
Koha9
969b32ab68 feat(ui): add bootstrap toast for messages 2025-07-13 06:56:26 +09:00
992161f9a9 refactor plex_client,config 2025-07-12 19:00:35 +09:00
Koha9
35bd0c9956
Merge pull request #6 from Koha9/token-connect
Token connect support
2025-07-08 19:59:02 +09:00
Koha9
4003fd5bc1
Merge pull request #5 from Koha9/codex/add-grayout-and-strikethrough-for-user-and-password
UI tweak for token login
2025-07-08 19:56:41 +09:00
8 changed files with 306 additions and 116 deletions

View File

@ -1,57 +1,62 @@
import os
from app.utils.config import load_config, save_config, get_server_settings
from app.utils.config import server_config
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from app.utils.plex_client import connect_plex
from app.utils.plex_client import plex_client
from app.utils.logger import logger
app = FastAPI()
templates = Jinja2Templates(directory=os.path.join(os.path.dirname(__file__), "templates"))
templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
# mount static files
# 这里的路径是相对于 main.py 文件所在的目录
app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static")
app.mount(
"/static",
StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")),
name="static",
)
# 显示主页
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
config = load_config()
theme = config.get("theme", "auto")
token,scheme, host, port = get_server_settings(config)
server_config.load()
return templates.TemplateResponse(
"login.html",
{
"request": request,
"theme": theme,
"theme": server_config.theme,
"path": "/login",
"scheme": scheme,
"token": token,
"server_url": host,
"port": port,
"scheme": server_config.scheme,
"token": server_config.token,
"server_url": server_config.url,
"port": server_config.port,
},
)
# 登录页面和处理
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
config = load_config()
theme = config.get("theme", "auto")
token, scheme, host, port = get_server_settings(config)
server_config.load()
return templates.TemplateResponse(
"login.html",
{
"request": request,
"theme": theme,
"theme": server_config.theme,
"path": "/login",
"token": token,
"scheme": scheme,
"server_url": host,
"port": port,
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
},
)
@app.post("/login", response_class=HTMLResponse)
async def login(
request: Request,
@ -62,33 +67,34 @@ async def login(
url: str = Form(...),
port: str = Form("32400"),
):
config = load_config()
theme = config.get("theme", "auto")
# 尝试连接到 Plex 服务器
try:
# 优先使用 token 连接,如果 token 为空则使用用户名和密码连接
_, token_success = connect_plex(user, pw, token, scheme, url, port)
_, token_success = plex_client.connect(
username=user,
password=pw,
token=token,
scheme=scheme,
url=url,
port=port,
)
# 成功连接后保存配置到配置文件
config.update({
"server_url": url,
"server_scheme": scheme,
"server_port": port,
"token": token_success,
})
save_config(config)
token, scheme, host, port = get_server_settings(config)
if plex_client.connected:
server_config.set_and_save_config(
token=token_success, scheme=scheme, url=url, port=port
)
return templates.TemplateResponse(
"login.html",
{
"request": request,
"message": "连接成功",
"success": True,
"theme": theme,
"message_type": "success",
"theme": server_config.theme,
"path": "/login",
"token": token,
"scheme": scheme,
"server_url": host,
"port": port,
"token": server_config.token,
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
},
)
except Exception as e:
@ -97,8 +103,8 @@ async def login(
{
"request": request,
"message": f"连接失败:{str(e)}",
"success": False,
"theme": theme,
"message_type": "danger",
"theme": server_config.theme,
"path": "/login",
"scheme": scheme,
"server_url": url,
@ -106,27 +112,33 @@ async def login(
},
)
@app.get("/playlist", response_class=HTMLResponse)
async def get_playlist(request: Request):
config = load_config()
theme = config.get("theme", "auto")
return templates.TemplateResponse("playlist.html", {"request": request, "theme": theme, "path": "/playlist"})
return templates.TemplateResponse(
"playlist.html",
{"request": request, "theme": server_config.theme, "path": "/playlist"},
)
@app.post("/playlist", response_class=HTMLResponse)
async def set_playlist(request: Request, address: str = Form(...), interval: str = Form(...)):
config = load_config()
theme = config.get("theme", "auto")
async def set_playlist(
request: Request, address: str = Form(...), interval: str = Form(...)
):
# demo返回提交的设置
return templates.TemplateResponse("playlist.html", {
"request": request,
"message": f"设置成功:地址 {address},间隔 {interval} 分钟",
"theme": theme,
"path": "/playlist"
})
return templates.TemplateResponse(
"playlist.html",
{
"request": request,
"message": f"设置成功:地址 {address},间隔 {interval} 分钟",
"message_type": "info",
"theme": server_config.theme,
"path": "/playlist",
},
)
@app.post("/set-theme")
async def set_theme(theme: str = Form(...)):
config = load_config()
config["theme"] = theme
save_config(config)
return RedirectResponse("/login", status_code=303)
server_config.set_and_save_config(theme=theme)
return RedirectResponse("/login", status_code=303)

View File

@ -1,3 +1,20 @@
.custom-sidebar {
max-width: 300px !important; /* 设置最大宽度为 300px */
}
}
/* Toast with frosted glass effect */
.glass-toast {
backdrop-filter: blur(5px);
}
.glass-toast-success {
background-color: rgba(var(--bs-success-rgb), 0.75) !important;
}
.glass-toast-danger {
background-color: rgba(var(--bs-danger-rgb), 0.75) !important;
}
.glass-toast-info {
background-color: rgba(var(--bs-info-rgb), 0.75) !important;
}

View File

@ -87,6 +87,18 @@
<main class="col ms-sm-auto px-md-4 py-4">
{% block content %}{% endblock %}
</main>
{% if message %}
<div class="position-fixed top-0 start-50 translate-middle-x p-3 w-75" style="z-index: 1055; margin-top: 20px;">
<div id="messageToast" class="toast text-bg-{{ message_type | default('info') }} border-0 glass-toast glass-toast-{{ message_type | default('info') }} w-100" role="alert" aria-live="assertive" aria-atomic="true">
<div class="d-flex">
<div class="toast-body">
{{ message }}
</div>
<button type="button" class="btn-close btn-close-white me-2 m-auto" data-bs-dismiss="toast" aria-label="Close"></button>
</div>
</div>
</div>
{% endif %}
</div>
</div>
@ -106,6 +118,14 @@
// listen for system theme changes
window.matchMedia('(prefers-color-scheme: dark)').addEventListener('change', applyAutoTheme);
}
// show toast message
document.addEventListener('DOMContentLoaded', () => {
const toastEl = document.getElementById('messageToast');
if (toastEl) {
const toast = new bootstrap.Toast(toastEl, { delay: 3000 });
toast.show();
}
});
</script>
</body>

View File

@ -35,9 +35,6 @@
<button type="submit" class="btn btn-primary">连接</button>
</form>
{% if message %}
<div class="alert alert-{{ 'success' if success else 'danger' }} mt-4">{{ message }}</div>
{% endif %}
<script>
document.addEventListener('DOMContentLoaded', () => {
const tokenInput = document.getElementById('token');

View File

@ -20,7 +20,4 @@
<button type="submit" class="btn btn-success">保存设置</button>
</form>
{% if message %}
<div class="alert alert-info mt-4">{{ message }}</div>
{% endif %}
{% endblock %}

View File

@ -1,36 +1,94 @@
import json
import os
from urllib.parse import urlparse
from app.utils.logger import logger
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "..", "config.json")
def load_config():
if not os.path.exists(CONFIG_PATH):
return {}
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def save_config(new_config):
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(new_config, f, indent=4, ensure_ascii=False)
CONFIG_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "config.json")
)
def get_server_settings(config):
"""Return (scheme, host, port) using defaults when not configured."""
scheme = config.get("server_scheme", "https")
token = config.get("token", "") or ""
host = ""
port = config.get("server_port", "32400") or "32400"
class ServerConfig:
url = config.get("server_url", "")
if url:
parsed = urlparse(url)
if parsed.scheme:
scheme = parsed.scheme
host = parsed.hostname or parsed.netloc
if parsed.port:
port = str(parsed.port)
else:
host = url
print(f"server_scheme: {scheme}, host: {host}, port: {port}")
return token, scheme, host, port
def __init__(self):
self.theme = "auto"
self.token = ""
self.url = ""
self.scheme = "https"
self.port = "32400"
self.load()
def load(self) -> None:
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
logger.debug(f"Loaded server config: {config}")
except FileNotFoundError:
# 如果配置文件不存在,使用默认值
self.save()
logger.debug("Config file not found, using default values.")
return
except json.JSONDecodeError:
# 如果配置文件格式错误,使用默认值
self.save()
logger.debug("Config file is invalid, using default values.")
return
self.theme = config.get("theme", "auto")
self.token = config.get("token", "")
self.url = config.get("server_url", "")
self.scheme = config.get("server_scheme", "https")
self.port = config.get("server_port", "32400")
logger.info(f"Server config loaded: {self.__dict__}")
def save(self):
config = {
"theme": self.theme,
"token": self.token,
"server_url": self.url,
"server_scheme": self.scheme,
"server_port": self.port,
}
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
logger.info(f"Server config saved: {config}")
def set_url(self, url: str) -> None:
self.url = url
def set_scheme(self, scheme: str) -> None:
self.scheme = scheme
def set_port(self, port: str) -> None:
self.port = port
def set_token(self, token: str) -> None:
self.token = token
def set_theme(self, theme: str) -> None:
# check theme is valid
if theme not in ["auto", "dark", "light"]:
logger.error(f"Invalid theme: {theme}")
raise ValueError("Invalid theme. Must be 'auto', 'dark', or 'light'.")
self.theme = theme
def set_and_save_config(
self,
theme: str = None,
token: str = None,
url: str = None,
scheme: str = None,
port: str = None,
) -> None:
if theme is not None:
self.set_theme(theme)
if token is not None:
self.set_token(token)
if url is not None:
self.set_url(url)
if scheme is not None:
self.set_scheme(scheme)
if port is not None:
self.set_port(port)
self.save()
server_config = ServerConfig()

33
app/utils/logger.py Normal file
View File

@ -0,0 +1,33 @@
import logging
import os
LOG_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "logs", "app.log"))
LOG_LEVEL = logging.DEBUG
def logger_initialize() -> logging.Logger:
"""Initialize the logger for the application. Return a logger that logs to console and a app.log."""
logger = logging.getLogger("PlexPlaylistSync")
if logger.hasHandlers():
return logger
# check log path exists, if not create it
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
logger.setLevel(LOG_LEVEL)
# 2025-07-19 17:23:05,116 [levelname]: message
formatter = logging.Formatter("%(asctime)s [%(levelname)s]: %(message)s")
file_handler = logging.FileHandler(LOG_PATH, encoding="utf-8")
file_handler.setFormatter(formatter)
# add file handler
logger.addHandler(file_handler)
# console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = logger_initialize()

View File

@ -2,6 +2,8 @@ from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from urllib.parse import urlparse
from app.utils.common import str_is_empty
from app.utils.logger import logger
def build_plex_url(scheme, url, port="32400"):
"""Build a full Plex URL from scheme, url, and port."""
@ -20,31 +22,85 @@ def build_plex_url(scheme, url, port="32400"):
base_url = f"http://{url}:{port}"
return base_url
def connect_plex(username, password, token, scheme, url, port="32400"):
"""Return a connected PlexServer instance and update config with token and server info."""
# 如果token存在且不为空则使用token连接
if not str_is_empty(token):
return connect_plex_with_token(token, scheme, url, port)
else:
return connect_plex_with_pw(username, password, scheme, url, port)
class PlexClient:
"""A client for interacting with a Plex server."""
def connect_plex_with_pw(username, password, scheme, url, port="32400"):
"""Return a connected PlexServer instance and update config with token and server info."""
# url 初始化
base_url = build_plex_url(scheme, url, port)
# account 初始化
account = MyPlexAccount(username, password)
# token 获取
token = account.authenticationToken
def __init__(self) -> None:
self.server: PlexServer | None = None
self.token: str | None = None
self.base_url: str | None = None
self.connected = False
server = PlexServer(base_url, token)
return server, token
def connect(
self,
username: str = "",
password: str = "",
token: str = "",
scheme: str = "https",
url: str = "",
port: str = "32400",
):
"""Connect to the Plex server using username/password or token."""
# Connect Server with token if token is provided, otherwise use username/password
try:
if not str_is_empty(token):
self.server, self.token = self._connect_with_token(token, scheme, url, port)
else:
self.server, self.token = self._connect_with_pw(
username, password, scheme, url, port
)
# Update the base URL and connection status
self.base_url = build_plex_url(scheme, url, port)
self.connected = True
logger.info(f"Connected to Plex server at {self.base_url} with token: {self.token}")
return self.server, self.token
except Exception as e:
logger.warning(f"Failed to connect to Plex server: {str(e)}")
self.connected = False
raise
def connect_plex_with_token(token, scheme, url, port="32400"):
"""Return a connected PlexServer instance using a token."""
# URL 初始化
base_url = build_plex_url(scheme, url, port)
server = PlexServer(base_url, token)
return server, token
def _connect_with_pw(self, username: str, password: str, scheme: str, url: str, port: str = "32400"):
"""Return a connected PlexServer instance and update config with token and server info."""
# url 初始化
self.base_url = build_plex_url(scheme, url, port)
# account 初始化
account = MyPlexAccount(username, password)
# token 获取
self.token = account.authenticationToken
self.server = PlexServer(self.base_url, self.token)
logger.debug(f"Connected to Plex server with username: {username}, token: {self.token}")
return self.server, self.token
def _connect_with_token(self, token: str, scheme: str, url: str, port: str = "32400"):
"""Return a connected PlexServer instance using a token."""
# URL 初始化
self.base_url = build_plex_url(scheme, url, port)
self.server = PlexServer(self.base_url, token)
logger.debug(f"Connected to Plex server with token: {token}")
return self.server, token
def get_server(self) -> PlexServer | None:
"""Return the connected Plex server instance."""
if not self.connected:
logger.error("Plex client is not connected.")
raise RuntimeError("Plex client is not connected.")
return self.server
def get_all_playlist(self) -> list | None:
"""Return all playlists from the Plex server."""
if not self.connected or not self.server:
logger.error("Plex server is not connected.")
raise ValueError("Plex server is not connected.")
try:
playlists = self.server.playlists()
logger.info(f"Fetched {len(playlists)} playlists from the server.")
return playlists
except Exception as e:
logger.warning(f"Failed to fetch playlists: {str(e)}")
raise RuntimeError(f"Failed to fetch playlists: {str(e)}")
plex_client = PlexClient()