Compare commits
2 Commits
d7f00408e5
...
c2b429272f
| Author | SHA1 | Date | |
|---|---|---|---|
| c2b429272f | |||
| 0ad64216f5 |
+2
-1
@@ -3,5 +3,6 @@
|
||||
"token": "",
|
||||
"server_url": "",
|
||||
"server_port": "",
|
||||
"server_scheme": ""
|
||||
"server_scheme": "",
|
||||
"path_rules": []
|
||||
}
|
||||
+30
@@ -125,6 +125,7 @@ def _build_home_context(
|
||||
"message": message,
|
||||
"message_type": message_type,
|
||||
"sync_result": sync_result,
|
||||
"path_rules": server_config.path_rules,
|
||||
}
|
||||
|
||||
|
||||
@@ -191,6 +192,35 @@ async def trigger_sync(request: Request, mode: str = Form(...), local_path: str
|
||||
return templates.TemplateResponse("home.html", context)
|
||||
|
||||
|
||||
@app.post("/path-rules", response_class=HTMLResponse)
|
||||
async def save_path_rules(
|
||||
request: Request,
|
||||
local_path: str = Form("playlist"),
|
||||
pattern: list[str] | None = Form(None),
|
||||
replacement: list[str] | None = Form(None),
|
||||
):
|
||||
patterns = pattern or []
|
||||
replacements = replacement or []
|
||||
|
||||
cleaned_rules: list[dict[str, str]] = []
|
||||
for pat, repl in zip(patterns, replacements):
|
||||
pat = pat.strip()
|
||||
if not pat:
|
||||
continue
|
||||
cleaned_rules.append({"pattern": pat, "replacement": repl or ""})
|
||||
|
||||
server_config.set_and_save_config(path_rules=cleaned_rules)
|
||||
|
||||
context = _build_home_context(
|
||||
request,
|
||||
local_path,
|
||||
message="正则规则已保存并会在同步前应用。",
|
||||
message_type="success",
|
||||
)
|
||||
|
||||
return templates.TemplateResponse("home.html", context)
|
||||
|
||||
|
||||
# 登录页面和处理
|
||||
@app.get("/login", response_class=HTMLResponse)
|
||||
async def login_page(request: Request):
|
||||
|
||||
@@ -38,3 +38,13 @@
|
||||
.status-unset {
|
||||
background-color: var(--bs-secondary);
|
||||
}
|
||||
|
||||
.rule-add-btn {
|
||||
width: 64px;
|
||||
height: 64px;
|
||||
font-size: 1.5rem;
|
||||
}
|
||||
|
||||
.rule-row .form-control {
|
||||
min-width: 0;
|
||||
}
|
||||
|
||||
@@ -144,4 +144,78 @@
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card mt-4">
|
||||
<div class="card-body">
|
||||
<div class="d-flex flex-column flex-lg-row align-items-lg-center gap-3 mb-3">
|
||||
<div>
|
||||
<h5 class="card-title mb-0">路径正则替换</h5>
|
||||
<small class="text-body-secondary">同步前按顺序处理每一首歌曲的路径,解决不同挂载路径的差异</small>
|
||||
</div>
|
||||
</div>
|
||||
<form id="pathRuleForm" method="post" action="/path-rules">
|
||||
<input type="hidden" name="local_path" value="{{ local_path }}">
|
||||
<div id="ruleList" class="d-flex flex-column gap-2 mb-3"></div>
|
||||
<div class="d-flex align-items-center gap-3 mb-3">
|
||||
<button class="btn btn-outline-primary rounded-circle rule-add-btn" type="button" id="addRuleBtn"
|
||||
title="添加新的正则规则">
|
||||
<i class="bi bi-plus-lg"></i>
|
||||
</button>
|
||||
<div class="text-body-secondary small">从上到下依次匹配,左侧填写正则表达式,右侧填写替换结果。</div>
|
||||
</div>
|
||||
<div class="d-flex justify-content-end">
|
||||
<button class="btn btn-success" type="submit">
|
||||
<i class="bi bi-save"></i>
|
||||
<span class="ms-1">保存规则</span>
|
||||
</button>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<template id="ruleRowTemplate">
|
||||
<div class="input-group rule-row">
|
||||
<span class="input-group-text">匹配</span>
|
||||
<input type="text" class="form-control" name="pattern" placeholder="正则表达式" required>
|
||||
<span class="input-group-text">替换为</span>
|
||||
<input type="text" class="form-control" name="replacement" placeholder="替换文本">
|
||||
<button class="btn btn-outline-danger" type="button" title="删除此规则">
|
||||
<i class="bi bi-x-lg"></i>
|
||||
</button>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
const existingRules = {{ path_rules | tojson | safe }};
|
||||
const ruleList = document.getElementById('ruleList');
|
||||
const addRuleBtn = document.getElementById('addRuleBtn');
|
||||
const template = document.getElementById('ruleRowTemplate');
|
||||
|
||||
function bindRemoveButton(row) {
|
||||
const removeBtn = row.querySelector('button');
|
||||
removeBtn.addEventListener('click', () => {
|
||||
row.remove();
|
||||
});
|
||||
}
|
||||
|
||||
function addRuleRow(pattern = '', replacement = '') {
|
||||
const clone = template.content.cloneNode(true);
|
||||
const row = clone.querySelector('.rule-row');
|
||||
const [patternInput, replacementInput] = row.querySelectorAll('input');
|
||||
patternInput.value = pattern;
|
||||
replacementInput.value = replacement;
|
||||
bindRemoveButton(row);
|
||||
ruleList.appendChild(clone);
|
||||
}
|
||||
|
||||
addRuleBtn.addEventListener('click', () => addRuleRow());
|
||||
|
||||
if (existingRules.length) {
|
||||
existingRules.forEach(rule => addRuleRow(rule.pattern || '', rule.replacement || ''));
|
||||
}
|
||||
|
||||
if (!ruleList.children.length) {
|
||||
addRuleRow();
|
||||
}
|
||||
</script>
|
||||
{% endblock %}
|
||||
|
||||
@@ -15,6 +15,7 @@ class ServerConfig:
|
||||
self.url = ""
|
||||
self.scheme = "https"
|
||||
self.port = "32400"
|
||||
self.path_rules: list[dict[str, str]] = []
|
||||
self.load()
|
||||
|
||||
def load(self) -> None:
|
||||
@@ -37,6 +38,7 @@ class ServerConfig:
|
||||
self.url = config.get("server_url", "")
|
||||
self.scheme = config.get("server_scheme", "https")
|
||||
self.port = config.get("server_port", "32400")
|
||||
self.path_rules = config.get("path_rules", []) or []
|
||||
logger.info(f"Server config loaded: {self.__dict__}")
|
||||
|
||||
def save(self):
|
||||
@@ -46,6 +48,7 @@ class ServerConfig:
|
||||
"server_url": self.url,
|
||||
"server_scheme": self.scheme,
|
||||
"server_port": self.port,
|
||||
"path_rules": self.path_rules,
|
||||
}
|
||||
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
|
||||
json.dump(config, f, indent=4, ensure_ascii=False)
|
||||
@@ -70,6 +73,9 @@ class ServerConfig:
|
||||
raise ValueError("Invalid theme. Must be 'auto', 'dark', or 'light'.")
|
||||
self.theme = theme
|
||||
|
||||
def set_path_rules(self, path_rules: list[dict[str, str]]) -> None:
|
||||
self.path_rules = path_rules or []
|
||||
|
||||
def set_and_save_config(
|
||||
self,
|
||||
theme: str = None,
|
||||
@@ -77,6 +83,7 @@ class ServerConfig:
|
||||
url: str = None,
|
||||
scheme: str = None,
|
||||
port: str = None,
|
||||
path_rules: list[dict[str, str]] | None = None,
|
||||
) -> None:
|
||||
if theme is not None:
|
||||
self.set_theme(theme)
|
||||
@@ -88,6 +95,8 @@ class ServerConfig:
|
||||
self.set_scheme(scheme)
|
||||
if port is not None:
|
||||
self.set_port(port)
|
||||
if path_rules is not None:
|
||||
self.set_path_rules(path_rules)
|
||||
self.save()
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import re
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
@@ -70,6 +71,60 @@ def save_paths(paths: Sequence[str]) -> str:
|
||||
return "#EXTM3U\n" + "\n".join(paths) + "\n"
|
||||
|
||||
|
||||
def _compile_regex_rules(rules: Sequence[dict[str, str]]) -> list[tuple[re.Pattern[str], str]]:
|
||||
compiled: list[tuple[re.Pattern[str], str]] = []
|
||||
for rule in rules:
|
||||
pattern = rule.get("pattern")
|
||||
if not pattern:
|
||||
continue
|
||||
replacement = rule.get("replacement", "")
|
||||
try:
|
||||
compiled.append((re.compile(pattern), replacement))
|
||||
except re.error as exc:
|
||||
logger.warning(f"Skipping invalid regex '{pattern}': {exc}")
|
||||
return compiled
|
||||
|
||||
|
||||
def _apply_compiled_rules_to_paths(
|
||||
paths: Sequence[str], compiled_rules: Sequence[tuple[re.Pattern[str], str]]
|
||||
) -> list[str]:
|
||||
if not compiled_rules:
|
||||
return list(paths)
|
||||
|
||||
updated: list[str] = []
|
||||
for path in paths:
|
||||
new_path = path
|
||||
for pattern, replacement in compiled_rules:
|
||||
new_path = pattern.sub(replacement, new_path)
|
||||
updated.append(new_path)
|
||||
return updated
|
||||
|
||||
|
||||
def apply_regex_rules_to_paths(
|
||||
paths: Sequence[str], rules: Sequence[dict[str, str]]
|
||||
) -> list[str]:
|
||||
"""Apply regex replacement rules to each path in order."""
|
||||
|
||||
compiled_rules = _compile_regex_rules(rules)
|
||||
return _apply_compiled_rules_to_paths(paths, compiled_rules)
|
||||
|
||||
|
||||
def preprocess_playlist_text(
|
||||
text: str,
|
||||
rules: Sequence[dict[str, str]],
|
||||
compiled_rules: Sequence[tuple[re.Pattern[str], str]] | None = None,
|
||||
) -> str:
|
||||
"""Normalize playlist text and apply regex replacements."""
|
||||
|
||||
if not text:
|
||||
text = "#EXTM3U\n"
|
||||
|
||||
paths = load_paths(text)
|
||||
compiled = compiled_rules if compiled_rules is not None else _compile_regex_rules(rules)
|
||||
replaced_paths = _apply_compiled_rules_to_paths(paths, compiled)
|
||||
return save_paths(replaced_paths)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MergeChunk:
|
||||
type: Literal["normal", "conflict"]
|
||||
@@ -514,6 +569,8 @@ def sync_all_playlists(
|
||||
) -> list[PlaylistSyncResult]:
|
||||
"""Synchronize all playlists that can be matched by name."""
|
||||
|
||||
server_config.load()
|
||||
compiled_rules = _compile_regex_rules(server_config.path_rules)
|
||||
_ensure_test_dir(test_folder)
|
||||
local_playlists = _load_local_playlists(local_dir)
|
||||
remote_playlists = _fetch_remote_playlists()
|
||||
@@ -541,6 +598,17 @@ def sync_all_playlists(
|
||||
remote_text = snapshot_remote_text
|
||||
remote_present = bool(remote_text.strip()) or remote_exists
|
||||
|
||||
base_text = preprocess_playlist_text(
|
||||
base_text, server_config.path_rules, compiled_rules
|
||||
)
|
||||
remote_text = preprocess_playlist_text(
|
||||
remote_text, server_config.path_rules, compiled_rules
|
||||
)
|
||||
if local_text is not None:
|
||||
local_text = preprocess_playlist_text(
|
||||
local_text, server_config.path_rules, compiled_rules
|
||||
)
|
||||
|
||||
# Treat missing remote text as absent playlist.
|
||||
result = _sync_single_playlist(
|
||||
playlist=playlist,
|
||||
|
||||
@@ -12,6 +12,7 @@ from app.utils.playlist_merge import (
|
||||
ConflictResolutionStrategy,
|
||||
load_paths,
|
||||
save_paths,
|
||||
preprocess_playlist_text,
|
||||
)
|
||||
|
||||
|
||||
@@ -94,6 +95,24 @@ class TestPlaylistMergeBasics:
|
||||
assert result.merged_paths == []
|
||||
assert result.conflicts == []
|
||||
|
||||
def test_preprocess_playlist_text(self):
|
||||
"""Ensure regex replacements run in order."""
|
||||
|
||||
raw = """#EXTM3U
|
||||
\\\\koha9-nas\\koha9-nas\\Music\\Album\\01.flac
|
||||
/music/cache/temp.flac
|
||||
"""
|
||||
rules = [
|
||||
{"pattern": r"\\\\koha9-nas\\koha9-nas\\Music", "replacement": r"N:\\Music"},
|
||||
{"pattern": r"/music/cache/", "replacement": "/data/music/"},
|
||||
]
|
||||
|
||||
processed = preprocess_playlist_text(raw, rules)
|
||||
lines = [line for line in processed.splitlines() if line and not line.startswith("#")]
|
||||
|
||||
assert lines[0] == r"N:\Music\Album\01.flac"
|
||||
assert lines[1] == "/data/music/temp.flac"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
|
||||
class TestPlaylistMergeLocalPriority:
|
||||
@@ -106,6 +125,10 @@ class TestPlaylistMergeLocalPriority:
|
||||
local_file = TEST_CASE_DIR / "local_playlist" / f"case{case_num}.m3u"
|
||||
remote_file = TEST_CASE_DIR / "remote_playlist" / f"case{case_num}.m3u"
|
||||
|
||||
if not base_file.exists():
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
if not (local_file.exists() and remote_file.exists()):
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
with open(base_file, 'r', encoding='utf-8') as f:
|
||||
base_text = f.read()
|
||||
with open(local_file, 'r', encoding='utf-8') as f:
|
||||
@@ -143,6 +166,10 @@ class TestPlaylistMergeRemotePriority:
|
||||
local_file = TEST_CASE_DIR / "local_playlist" / f"case{case_num}.m3u"
|
||||
remote_file = TEST_CASE_DIR / "remote_playlist" / f"case{case_num}.m3u"
|
||||
|
||||
if not base_file.exists():
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
if not (local_file.exists() and remote_file.exists()):
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
with open(base_file, 'r', encoding='utf-8') as f:
|
||||
base_text = f.read()
|
||||
with open(local_file, 'r', encoding='utf-8') as f:
|
||||
|
||||
Reference in New Issue
Block a user