Compare commits
6 Commits
58709570a9
...
e3aae69068
| Author | SHA1 | Date | |
|---|---|---|---|
| e3aae69068 | |||
| 1b6dddacc8 | |||
| 355886e797 | |||
| f943105948 | |||
| b0613d1616 | |||
| 3ffc90f43b |
-245
@@ -1,245 +0,0 @@
|
||||
# 测试框架使用指南
|
||||
|
||||
## 概述
|
||||
|
||||
项目已配置 **pytest** 测试框架,相比你之前的手动测试方法,有以下优势:
|
||||
|
||||
### ✅ 改进点
|
||||
|
||||
| 旧方法 (Jupyter Notebook) | 新方法 (pytest) |
|
||||
|-------------------------|----------------|
|
||||
| 手动运行每个测试用例 | 自动发现和运行所有测试 |
|
||||
| 手动读取和对比文件 | 自动化断言和差异报告 |
|
||||
| 难以定位失败原因 | 清晰的错误堆栈和 diff |
|
||||
| 无法批量运行 | 支持参数化和并行测试 |
|
||||
| 无覆盖率统计 | 自动生成覆盖率报告 |
|
||||
|
||||
## 快速开始
|
||||
|
||||
### 1. 运行所有测试
|
||||
```bash
|
||||
python -m pytest
|
||||
```
|
||||
|
||||
### 2. 运行特定测试文件
|
||||
```bash
|
||||
python -m pytest tests/test_playlist_merge.py -v
|
||||
```
|
||||
|
||||
### 3. 在 VS Code 中运行
|
||||
- 按 `Ctrl+Shift+P`
|
||||
- 输入 "Run Task"
|
||||
- 选择 "运行所有测试" 或其他测试任务
|
||||
|
||||
## 测试文件说明
|
||||
|
||||
### 已创建的测试文件
|
||||
|
||||
```
|
||||
tests/
|
||||
├── __init__.py # 包初始化
|
||||
├── conftest.py # pytest fixtures 和配置
|
||||
├── test_playlist_merge.py # 播放列表合并测试
|
||||
└── README.md # 详细测试指南
|
||||
```
|
||||
|
||||
### 测试用例覆盖
|
||||
|
||||
`test_playlist_merge.py` 包含:
|
||||
|
||||
1. **基础功能测试** (`TestPlaylistMergeBasics`)
|
||||
- `test_load_paths_removes_comments` - 测试路径加载
|
||||
- `test_save_paths_adds_header` - 测试文件保存
|
||||
- `test_empty_playlist_merge` - 测试空播放列表合并
|
||||
|
||||
2. **本地优先测试** (`TestPlaylistMergeLocalPriority`)
|
||||
- 参数化测试 case1-4 的本地优先合并
|
||||
|
||||
3. **远程优先测试** (`TestPlaylistMergeRemotePriority`)
|
||||
- 参数化测试 case1-4 的远程优先合并
|
||||
|
||||
4. **冲突检测测试** (`TestConflictDetection`)
|
||||
- 测试冲突识别和解决
|
||||
|
||||
## 常用命令
|
||||
|
||||
### 基础运行
|
||||
```bash
|
||||
# 运行所有测试
|
||||
pytest
|
||||
|
||||
# 详细输出
|
||||
pytest -v
|
||||
|
||||
# 显示 print 输出
|
||||
pytest -s
|
||||
|
||||
# 只运行特定测试
|
||||
pytest tests/test_playlist_merge.py::TestPlaylistMergeBasics::test_load_paths_removes_comments
|
||||
```
|
||||
|
||||
### 测试筛选
|
||||
```bash
|
||||
# 运行特定 case
|
||||
pytest -k "case_num-1"
|
||||
|
||||
# 运行包含特定关键字的测试
|
||||
pytest -k "local_priority"
|
||||
|
||||
# 运行标记的测试
|
||||
pytest -m unit
|
||||
```
|
||||
|
||||
### 调试和重试
|
||||
```bash
|
||||
# 失败时进入调试器
|
||||
pytest --pdb
|
||||
|
||||
# 只运行上次失败的测试
|
||||
pytest --lf
|
||||
|
||||
# 先运行失败的测试
|
||||
pytest --ff
|
||||
|
||||
# 遇到第一个失败就停止
|
||||
pytest -x
|
||||
```
|
||||
|
||||
### 覆盖率报告
|
||||
```bash
|
||||
# 生成覆盖率报告
|
||||
pytest --cov=app --cov-report=html
|
||||
|
||||
# 在浏览器中查看
|
||||
# 打开 htmlcov/index.html
|
||||
|
||||
# 终端中显示覆盖率
|
||||
pytest --cov=app --cov-report=term
|
||||
```
|
||||
|
||||
## 编写新测试
|
||||
|
||||
### 1. 创建测试文件
|
||||
在 `tests/` 目录下创建 `test_<feature>.py`
|
||||
|
||||
### 2. 编写测试类和函数
|
||||
```python
|
||||
import pytest
|
||||
|
||||
class TestMyFeature:
|
||||
"""测试我的功能"""
|
||||
|
||||
def test_basic_case(self):
|
||||
"""测试基本用例"""
|
||||
result = my_function()
|
||||
assert result == expected_value
|
||||
|
||||
@pytest.mark.parametrize("input,expected", [
|
||||
(1, 2),
|
||||
(2, 4),
|
||||
(3, 6),
|
||||
])
|
||||
def test_multiple_cases(self, input, expected):
|
||||
"""参数化测试多个用例"""
|
||||
assert my_function(input) == expected
|
||||
```
|
||||
|
||||
### 3. 使用 fixtures
|
||||
```python
|
||||
@pytest.fixture
|
||||
def sample_data():
|
||||
"""提供测试数据"""
|
||||
return {"key": "value"}
|
||||
|
||||
def test_with_fixture(sample_data):
|
||||
assert sample_data["key"] == "value"
|
||||
```
|
||||
|
||||
## 与旧测试方法的对比
|
||||
|
||||
### 旧方法 (test.ipynb)
|
||||
```python
|
||||
# 手动循环和对比
|
||||
for i in range(len(test_playlists[1])):
|
||||
with open(test_playlists[1][i], 'r', encoding='utf-8') as f1, \
|
||||
open(target_playlists[i], 'r', encoding='utf-8') as f2:
|
||||
content1 = f1.read()
|
||||
content2 = f2.read()
|
||||
# 手动处理和对比...
|
||||
if content1 == content2:
|
||||
print(f"Test case {i+1} passed.")
|
||||
else:
|
||||
print(f"Test case {i+1} failed.")
|
||||
```
|
||||
|
||||
### 新方法 (pytest)
|
||||
```python
|
||||
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
|
||||
def test_merge_local_priority(case_num, tmp_path):
|
||||
"""自动运行所有用例,失败时显示详细 diff"""
|
||||
# 加载输入
|
||||
base_text = load_file(f"case{case_num}.m3u")
|
||||
# ... 执行测试
|
||||
result = merge_playlists(base_text, local_text, remote_text)
|
||||
# 自动断言和差异报告
|
||||
assert_playlists_equal(actual_file, expected_file)
|
||||
```
|
||||
|
||||
## 持续集成
|
||||
|
||||
可以将测试集成到 CI/CD 流程:
|
||||
|
||||
```yaml
|
||||
# .github/workflows/test.yml (示例)
|
||||
name: Tests
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v2
|
||||
- name: Install dependencies
|
||||
run: pip install -r requirements.txt
|
||||
- name: Run tests
|
||||
run: pytest --cov=app
|
||||
```
|
||||
|
||||
## 最佳实践
|
||||
|
||||
1. **测试命名** - 使用清晰描述性的名称
|
||||
2. **独立性** - 每个测试应该独立运行
|
||||
3. **快速** - 单元测试应该快速执行
|
||||
4. **明确** - 一个测试只测一个功能点
|
||||
5. **维护** - 定期更新测试用例
|
||||
|
||||
## 故障排除
|
||||
|
||||
### pytest 找不到
|
||||
```bash
|
||||
python -m pip install pytest pytest-cov
|
||||
```
|
||||
|
||||
### 导入错误
|
||||
确保项目根目录在 Python 路径中:
|
||||
```python
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
```
|
||||
|
||||
### 测试发现失败
|
||||
检查 `pytest.ini` 配置和文件命名是否符合规范。
|
||||
|
||||
## 下一步
|
||||
|
||||
- [ ] 为其他模块添加测试 (如 `plex_client.py`, `local_playlist.py`)
|
||||
- [ ] 添加集成测试
|
||||
- [ ] 配置 CI/CD 自动运行测试
|
||||
- [ ] 设置测试覆盖率目标 (如 >80%)
|
||||
|
||||
## 参考资料
|
||||
|
||||
- [pytest 官方文档](https://docs.pytest.org/)
|
||||
- [pytest-cov 文档](https://pytest-cov.readthedocs.io/)
|
||||
- 项目内测试: `tests/README.md`
|
||||
+74
-7
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
from typing import Tuple
|
||||
from app.utils.config import server_config
|
||||
from app.utils.playlist_merge import SyncMode, sync_all_playlists, TEST_PLAYLIST_DIR
|
||||
from fastapi import FastAPI, Request, Form
|
||||
@@ -48,7 +47,7 @@ SYNC_MODE_OPTIONS = [
|
||||
]
|
||||
|
||||
|
||||
def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
|
||||
def _get_cloud_playlists() -> tuple[list[dict], str, dict, str, list[str]]:
|
||||
"""Fetch playlists and connection state from the remote Plex server."""
|
||||
|
||||
server_config.load()
|
||||
@@ -58,10 +57,12 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
|
||||
"name": "未设置",
|
||||
"domain": server_config.url or "未设置",
|
||||
}
|
||||
selected_library = server_config.library_name
|
||||
music_libraries: list[str] = []
|
||||
|
||||
# no server url configured
|
||||
if not server_config.url:
|
||||
return playlists, status, server_info
|
||||
return playlists, status, server_info, selected_library, music_libraries
|
||||
|
||||
status = "failed"
|
||||
try:
|
||||
@@ -79,7 +80,16 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
|
||||
}
|
||||
)
|
||||
|
||||
for playlist in plex_client.server.playlists():
|
||||
music_libraries = plex_client.get_libs_name_list()
|
||||
if not music_libraries:
|
||||
server_config.set_and_save_config(library_name="")
|
||||
return playlists, status, server_info, "", music_libraries
|
||||
|
||||
if not selected_library or selected_library not in music_libraries:
|
||||
selected_library = music_libraries[0]
|
||||
server_config.set_and_save_config(library_name=selected_library)
|
||||
|
||||
for playlist in plex_client.get_lib_playlists(selected_library) or []:
|
||||
track_count = getattr(playlist, "itemCount", None)
|
||||
if track_count is None:
|
||||
try:
|
||||
@@ -97,7 +107,8 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
|
||||
status = "failed"
|
||||
|
||||
playlists.sort(key=lambda item: item["name"].lower())
|
||||
return playlists, status, server_info
|
||||
return playlists, status, server_info, selected_library, music_libraries
|
||||
|
||||
|
||||
def _build_home_context(
|
||||
request: Request,
|
||||
@@ -109,7 +120,13 @@ def _build_home_context(
|
||||
):
|
||||
server_config.load()
|
||||
local_playlists = scan_local_playlists(local_path)
|
||||
cloud_playlists, connection_status, server_info = _get_cloud_playlists()
|
||||
(
|
||||
cloud_playlists,
|
||||
connection_status,
|
||||
server_info,
|
||||
selected_library,
|
||||
music_libraries,
|
||||
) = _get_cloud_playlists()
|
||||
|
||||
return {
|
||||
"request": request,
|
||||
@@ -120,6 +137,8 @@ def _build_home_context(
|
||||
"cloud_playlists": cloud_playlists,
|
||||
"connection_status": connection_status,
|
||||
"server_info": server_info,
|
||||
"selected_library": selected_library,
|
||||
"music_libraries": music_libraries,
|
||||
"sync_modes": SYNC_MODE_OPTIONS,
|
||||
"selected_mode": selected_mode,
|
||||
"message": message,
|
||||
@@ -225,6 +244,25 @@ async def save_path_rules(
|
||||
@app.get("/login", response_class=HTMLResponse)
|
||||
async def login_page(request: Request):
|
||||
server_config.load()
|
||||
music_libraries = []
|
||||
selected_library = server_config.library_name
|
||||
if server_config.url and server_config.token:
|
||||
try:
|
||||
plex_client.connect(
|
||||
token=server_config.token,
|
||||
scheme=server_config.scheme,
|
||||
url=server_config.url,
|
||||
port=server_config.port,
|
||||
)
|
||||
music_libraries = plex_client.get_libs_name_list()
|
||||
if music_libraries:
|
||||
if selected_library not in music_libraries:
|
||||
selected_library = music_libraries[0]
|
||||
else:
|
||||
selected_library = ""
|
||||
except Exception:
|
||||
selected_library = ""
|
||||
music_libraries = []
|
||||
return templates.TemplateResponse(
|
||||
"login.html",
|
||||
{
|
||||
@@ -235,6 +273,8 @@ async def login_page(request: Request):
|
||||
"scheme": server_config.scheme,
|
||||
"server_url": server_config.url,
|
||||
"port": server_config.port,
|
||||
"music_libraries": music_libraries,
|
||||
"selected_library": selected_library,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -248,6 +288,7 @@ async def login(
|
||||
scheme: str = Form("https"),
|
||||
url: str = Form(...),
|
||||
port: str = Form("32400"),
|
||||
library_name: str = Form(""),
|
||||
):
|
||||
# 尝试连接到 Plex 服务器
|
||||
try:
|
||||
@@ -261,9 +302,30 @@ async def login(
|
||||
port=port,
|
||||
)
|
||||
# 成功连接后保存配置到配置文件
|
||||
music_libraries: list[str] = []
|
||||
selected_library = ""
|
||||
if plex_client.connected:
|
||||
try:
|
||||
music_libraries = plex_client.get_libs_name_list()
|
||||
except Exception as exc:
|
||||
logger.warning(f"Unable to fetch music libraries: {exc}")
|
||||
music_libraries = []
|
||||
if music_libraries:
|
||||
if library_name and library_name in music_libraries:
|
||||
selected_library = library_name
|
||||
else:
|
||||
selected_library = music_libraries[0]
|
||||
server_config.set_and_save_config(
|
||||
token=token_success, scheme=scheme, url=url, port=port
|
||||
token=token_success,
|
||||
scheme=scheme,
|
||||
url=url,
|
||||
port=port,
|
||||
library_name=selected_library,
|
||||
)
|
||||
else:
|
||||
music_libraries = []
|
||||
server_config.set_and_save_config(
|
||||
token=token_success, scheme=scheme, url=url, port=port, library_name=""
|
||||
)
|
||||
return templates.TemplateResponse(
|
||||
"login.html",
|
||||
@@ -277,9 +339,12 @@ async def login(
|
||||
"scheme": server_config.scheme,
|
||||
"server_url": server_config.url,
|
||||
"port": server_config.port,
|
||||
"music_libraries": music_libraries,
|
||||
"selected_library": selected_library,
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
music_libraries = []
|
||||
return templates.TemplateResponse(
|
||||
"login.html",
|
||||
{
|
||||
@@ -291,6 +356,8 @@ async def login(
|
||||
"scheme": scheme,
|
||||
"server_url": url,
|
||||
"port": port,
|
||||
"music_libraries": music_libraries,
|
||||
"selected_library": "",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
+22
-8
@@ -107,6 +107,7 @@
|
||||
<div>
|
||||
<h5 class="card-title mb-0">云端播放列表</h5>
|
||||
<small class="text-body-secondary">来自已连接的 Plex 服务器</small>
|
||||
<div class="text-body-secondary small mt-1">当前音乐库:{{ selected_library or '暂无可用音乐库' }}</div>
|
||||
</div>
|
||||
<form class="ms-lg-auto" method="get" action="/">
|
||||
<input type="hidden" name="local_path" value="{{ local_path }}">
|
||||
@@ -163,7 +164,11 @@
|
||||
</button>
|
||||
<div class="text-body-secondary small">从上到下依次匹配,左侧填写正则表达式,右侧填写替换结果。</div>
|
||||
</div>
|
||||
<div class="d-flex justify-content-end">
|
||||
<div class="d-flex justify-content-end gap-2">
|
||||
<button class="btn btn-outline-secondary" type="button" id="cancelRulesBtn">
|
||||
<i class="bi bi-arrow-counterclockwise"></i>
|
||||
<span class="ms-1">取消变更</span>
|
||||
</button>
|
||||
<button class="btn btn-success" type="submit">
|
||||
<i class="bi bi-save"></i>
|
||||
<span class="ms-1">保存规则</span>
|
||||
@@ -189,8 +194,13 @@
|
||||
const existingRules = {{ path_rules | tojson | safe }};
|
||||
const ruleList = document.getElementById('ruleList');
|
||||
const addRuleBtn = document.getElementById('addRuleBtn');
|
||||
const cancelRulesBtn = document.getElementById('cancelRulesBtn');
|
||||
const template = document.getElementById('ruleRowTemplate');
|
||||
|
||||
const initialRules = existingRules.length
|
||||
? existingRules.map(rule => ({ ...rule }))
|
||||
: [{ pattern: '', replacement: '' }];
|
||||
|
||||
function bindRemoveButton(row) {
|
||||
const removeBtn = row.querySelector('button');
|
||||
removeBtn.addEventListener('click', () => {
|
||||
@@ -208,14 +218,18 @@
|
||||
ruleList.appendChild(clone);
|
||||
}
|
||||
|
||||
function renderRules(rules) {
|
||||
ruleList.innerHTML = '';
|
||||
if (!rules.length) {
|
||||
addRuleRow();
|
||||
return;
|
||||
}
|
||||
rules.forEach(rule => addRuleRow(rule.pattern || '', rule.replacement || ''));
|
||||
}
|
||||
|
||||
addRuleBtn.addEventListener('click', () => addRuleRow());
|
||||
cancelRulesBtn.addEventListener('click', () => renderRules(initialRules));
|
||||
|
||||
if (existingRules.length) {
|
||||
existingRules.forEach(rule => addRuleRow(rule.pattern || '', rule.replacement || ''));
|
||||
}
|
||||
|
||||
if (!ruleList.children.length) {
|
||||
addRuleRow();
|
||||
}
|
||||
renderRules(initialRules);
|
||||
</script>
|
||||
{% endblock %}
|
||||
|
||||
@@ -32,6 +32,19 @@
|
||||
<label for="port" class="form-label">端口</label>
|
||||
<input type="text" class="form-control" id="port" name="port" value="{{ port }}">
|
||||
</div>
|
||||
<div class="mb-3">
|
||||
<label for="library_name" class="form-label">音乐库</label>
|
||||
<select class="form-select" id="library_name" name="library_name" {% if not music_libraries %}disabled{% endif %}>
|
||||
{% if music_libraries %}
|
||||
{% for library in music_libraries %}
|
||||
<option value="{{ library }}" {% if selected_library == library %}selected{% endif %}>{{ library }}</option>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<option value="" selected>暂无可用的音乐库</option>
|
||||
{% endif %}
|
||||
</select>
|
||||
<div class="form-text">仅显示音乐类型的库,可在连接成功后自动刷新。</div>
|
||||
</div>
|
||||
<button type="submit" class="btn btn-primary">连接</button>
|
||||
</form>
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ class ServerConfig:
|
||||
self.url = ""
|
||||
self.scheme = "https"
|
||||
self.port = "32400"
|
||||
self.library_name = ""
|
||||
self.path_rules: list[dict[str, str]] = []
|
||||
self.load()
|
||||
|
||||
@@ -38,6 +39,7 @@ class ServerConfig:
|
||||
self.url = config.get("server_url", "")
|
||||
self.scheme = config.get("server_scheme", "https")
|
||||
self.port = config.get("server_port", "32400")
|
||||
self.library_name = config.get("library_name", "")
|
||||
self.path_rules = config.get("path_rules", []) or []
|
||||
logger.info(f"Server config loaded: {self.__dict__}")
|
||||
|
||||
@@ -48,6 +50,7 @@ class ServerConfig:
|
||||
"server_url": self.url,
|
||||
"server_scheme": self.scheme,
|
||||
"server_port": self.port,
|
||||
"library_name": self.library_name,
|
||||
"path_rules": self.path_rules,
|
||||
}
|
||||
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
|
||||
@@ -66,6 +69,9 @@ class ServerConfig:
|
||||
def set_token(self, token: str) -> None:
|
||||
self.token = token
|
||||
|
||||
def set_library(self, library_name: str) -> None:
|
||||
self.library_name = library_name or ""
|
||||
|
||||
def set_theme(self, theme: str) -> None:
|
||||
# check theme is valid
|
||||
if theme not in ["auto", "dark", "light"]:
|
||||
@@ -83,6 +89,7 @@ class ServerConfig:
|
||||
url: str = None,
|
||||
scheme: str = None,
|
||||
port: str = None,
|
||||
library_name: str | None = None,
|
||||
path_rules: list[dict[str, str]] | None = None,
|
||||
) -> None:
|
||||
if theme is not None:
|
||||
@@ -95,6 +102,8 @@ class ServerConfig:
|
||||
self.set_scheme(scheme)
|
||||
if port is not None:
|
||||
self.set_port(port)
|
||||
if library_name is not None:
|
||||
self.set_library(library_name)
|
||||
if path_rules is not None:
|
||||
self.set_path_rules(path_rules)
|
||||
self.save()
|
||||
|
||||
@@ -137,8 +137,11 @@ class PlexClient:
|
||||
self._connect_check()
|
||||
try:
|
||||
libraries = self.server.library.sections()
|
||||
lib_names = [lib.title for lib in libraries]
|
||||
logger.info(f"Fetched {len(lib_names)} library names from the server.")
|
||||
music_libs = [lib for lib in libraries if getattr(lib, "type", None) == MUSIC_LIBRARY_TYPE]
|
||||
lib_names = [lib.title for lib in music_libs]
|
||||
logger.info(
|
||||
f"Fetched {len(lib_names)} music library names from the server (type='{MUSIC_LIBRARY_TYPE}')."
|
||||
)
|
||||
return lib_names
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch library names: {str(e)}")
|
||||
|
||||
-26
@@ -1,26 +0,0 @@
|
||||
[pytest]
|
||||
# Pytest configuration
|
||||
|
||||
# Test discovery patterns
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
|
||||
# Test paths
|
||||
testpaths = tests
|
||||
|
||||
# Output options
|
||||
addopts =
|
||||
-v
|
||||
--tb=short
|
||||
--strict-markers
|
||||
--disable-warnings
|
||||
|
||||
# Markers for categorizing tests
|
||||
markers =
|
||||
slow: marks tests as slow (deselect with '-m "not slow"')
|
||||
integration: marks tests as integration tests
|
||||
unit: marks tests as unit tests
|
||||
|
||||
# Minimum version
|
||||
minversion = 6.0
|
||||
@@ -1,88 +0,0 @@
|
||||
# 测试指南
|
||||
|
||||
## 运行测试
|
||||
|
||||
### 运行所有测试
|
||||
```bash
|
||||
pytest
|
||||
```
|
||||
|
||||
### 运行特定测试文件
|
||||
```bash
|
||||
pytest tests/test_playlist_merge.py
|
||||
```
|
||||
|
||||
### 运行特定测试类或函数
|
||||
```bash
|
||||
pytest tests/test_playlist_merge.py::TestPlaylistMergeLocalPriority
|
||||
pytest tests/test_playlist_merge.py::TestPlaylistMergeLocalPriority::test_merge_local_priority
|
||||
```
|
||||
|
||||
### 运行特定参数化测试用例
|
||||
```bash
|
||||
# 只运行 case1 的测试
|
||||
pytest tests/test_playlist_merge.py -k "case_num-1"
|
||||
```
|
||||
|
||||
### 查看详细输出
|
||||
```bash
|
||||
pytest -v # verbose
|
||||
pytest -vv # more verbose
|
||||
pytest -s # 显示 print 输出
|
||||
```
|
||||
|
||||
### 查看测试覆盖率
|
||||
```bash
|
||||
pip install pytest-cov
|
||||
pytest --cov=app --cov-report=html
|
||||
```
|
||||
然后在浏览器中打开 `htmlcov/index.html`
|
||||
|
||||
## 测试结构
|
||||
|
||||
- `test_playlist_merge.py` - 播放列表合并功能的单元测试
|
||||
- `conftest.py` - pytest fixtures 和配置
|
||||
- `pytest.ini` - pytest 配置文件
|
||||
|
||||
## 编写新测试
|
||||
|
||||
1. 在 `tests/` 目录下创建 `test_*.py` 文件
|
||||
2. 创建以 `Test` 开头的测试类
|
||||
3. 编写以 `test_` 开头的测试函数
|
||||
4. 使用 `assert` 进行断言
|
||||
5. 使用 `@pytest.mark.parametrize` 进行参数化测试
|
||||
|
||||
示例:
|
||||
```python
|
||||
import pytest
|
||||
|
||||
class TestMyFeature:
|
||||
def test_basic_functionality(self):
|
||||
result = my_function(input_data)
|
||||
assert result == expected_output
|
||||
|
||||
@pytest.mark.parametrize("input,expected", [
|
||||
(1, 2),
|
||||
(2, 4),
|
||||
(3, 6),
|
||||
])
|
||||
def test_with_multiple_inputs(self, input, expected):
|
||||
assert my_function(input) == expected
|
||||
```
|
||||
|
||||
## 调试测试
|
||||
|
||||
### 在测试失败时进入调试器
|
||||
```bash
|
||||
pytest --pdb
|
||||
```
|
||||
|
||||
### 只运行上次失败的测试
|
||||
```bash
|
||||
pytest --lf
|
||||
```
|
||||
|
||||
### 先运行失败的测试
|
||||
```bash
|
||||
pytest --ff
|
||||
```
|
||||
@@ -1 +0,0 @@
|
||||
# Test package initialization
|
||||
@@ -1,45 +0,0 @@
|
||||
"""
|
||||
Pytest configuration and shared fixtures.
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def project_root():
|
||||
"""Return the project root directory."""
|
||||
return Path(__file__).parent.parent
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def test_data_dir(project_root):
|
||||
"""Return the test data directory."""
|
||||
return project_root / "test_case"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def expected_results_dir(project_root):
|
||||
"""Return the expected results directory."""
|
||||
return project_root / "test_res"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_test_dir(tmp_path):
|
||||
"""Create a temporary directory for test output."""
|
||||
test_dir = tmp_path / "test_output"
|
||||
test_dir.mkdir(exist_ok=True)
|
||||
yield test_dir
|
||||
# Cleanup after test (optional)
|
||||
# shutil.rmtree(test_dir, ignore_errors=True)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_test_state():
|
||||
"""Reset any global state before each test."""
|
||||
# Add any necessary cleanup here
|
||||
yield
|
||||
# Post-test cleanup
|
||||
@@ -1,225 +0,0 @@
|
||||
"""
|
||||
Unit tests for playlist merge functionality.
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from app.utils.playlist_merge import (
|
||||
merge_playlists,
|
||||
ConflictResolutionStrategy,
|
||||
load_paths,
|
||||
save_paths,
|
||||
preprocess_playlist_text,
|
||||
)
|
||||
|
||||
|
||||
# Test data directory
|
||||
TEST_CASE_DIR = Path(__file__).parent.parent / "test_case"
|
||||
TEST_RES_DIR = Path(__file__).parent.parent / "test_res"
|
||||
DOCKERAPP_TEST_DIR = Path(__file__).parent.parent / "dockerapp" / "test_playlists"
|
||||
|
||||
|
||||
def normalize_playlist_content(file_path: str | Path) -> list[str]:
|
||||
"""
|
||||
Read a playlist file and return normalized content (without comments/blanks).
|
||||
|
||||
Args:
|
||||
file_path: Path to the playlist file
|
||||
|
||||
Returns:
|
||||
List of track paths (non-empty, non-comment lines)
|
||||
"""
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Remove blank lines and comments
|
||||
lines = [
|
||||
line.strip()
|
||||
for line in content.splitlines()
|
||||
if line.strip() and not line.startswith('#')
|
||||
]
|
||||
return lines
|
||||
|
||||
|
||||
def assert_playlists_equal(actual_file: str | Path, expected_file: str | Path):
|
||||
"""
|
||||
Assert that two playlist files have identical content (ignoring comments).
|
||||
|
||||
Args:
|
||||
actual_file: Path to the generated playlist
|
||||
expected_file: Path to the expected playlist
|
||||
|
||||
Raises:
|
||||
AssertionError: If playlists differ
|
||||
"""
|
||||
actual_content = normalize_playlist_content(actual_file)
|
||||
expected_content = normalize_playlist_content(expected_file)
|
||||
|
||||
if actual_content != expected_content:
|
||||
# Provide detailed diff on failure
|
||||
import difflib
|
||||
diff = difflib.unified_diff(
|
||||
expected_content,
|
||||
actual_content,
|
||||
fromfile=str(expected_file),
|
||||
tofile=str(actual_file),
|
||||
lineterm=''
|
||||
)
|
||||
diff_text = '\n'.join(diff)
|
||||
pytest.fail(f"Playlist files differ:\n{diff_text}")
|
||||
|
||||
|
||||
class TestPlaylistMergeBasics:
|
||||
"""Test basic playlist merge operations."""
|
||||
|
||||
def test_load_paths_removes_comments(self):
|
||||
"""Test that load_paths correctly filters comments and blanks."""
|
||||
text = "#EXTM3U\n\n/path/to/track1.mp3\n#Comment\n/path/to/track2.mp3\n"
|
||||
paths = load_paths(text)
|
||||
assert paths == ["/path/to/track1.mp3", "/path/to/track2.mp3"]
|
||||
|
||||
def test_save_paths_adds_header(self):
|
||||
"""Test that save_paths adds proper m3u header."""
|
||||
paths = ["/path/to/track1.mp3", "/path/to/track2.mp3"]
|
||||
text = save_paths(paths)
|
||||
assert text.startswith("#EXTM3U\n")
|
||||
assert "/path/to/track1.mp3" in text
|
||||
assert "/path/to/track2.mp3" in text
|
||||
|
||||
def test_empty_playlist_merge(self):
|
||||
"""Test merging empty playlists."""
|
||||
result = merge_playlists("", "", "", test_folder=str(DOCKERAPP_TEST_DIR / "empty_test"))
|
||||
assert result.merged_paths == []
|
||||
assert result.conflicts == []
|
||||
|
||||
def test_preprocess_playlist_text(self):
|
||||
"""Ensure regex replacements run in order."""
|
||||
|
||||
raw = """#EXTM3U
|
||||
\\\\koha9-nas\\koha9-nas\\Music\\Album\\01.flac
|
||||
/music/cache/temp.flac
|
||||
"""
|
||||
rules = [
|
||||
{"pattern": r"\\\\koha9-nas\\koha9-nas\\Music", "replacement": r"N:\\Music"},
|
||||
{"pattern": r"/music/cache/", "replacement": "/data/music/"},
|
||||
]
|
||||
|
||||
processed = preprocess_playlist_text(raw, rules)
|
||||
lines = [line for line in processed.splitlines() if line and not line.startswith("#")]
|
||||
|
||||
assert lines[0] == r"N:\Music\Album\01.flac"
|
||||
assert lines[1] == "/data/music/temp.flac"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
|
||||
class TestPlaylistMergeLocalPriority:
|
||||
"""Test playlist merge with local priority strategy."""
|
||||
|
||||
def test_merge_local_priority(self, case_num, tmp_path):
|
||||
"""Test case {case_num} with local priority."""
|
||||
# Load input files
|
||||
base_file = TEST_CASE_DIR / "base" / f"case{case_num}.m3u"
|
||||
local_file = TEST_CASE_DIR / "local_playlist" / f"case{case_num}.m3u"
|
||||
remote_file = TEST_CASE_DIR / "remote_playlist" / f"case{case_num}.m3u"
|
||||
|
||||
if not base_file.exists():
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
if not (local_file.exists() and remote_file.exists()):
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
with open(base_file, 'r', encoding='utf-8') as f:
|
||||
base_text = f.read()
|
||||
with open(local_file, 'r', encoding='utf-8') as f:
|
||||
local_text = f.read()
|
||||
with open(remote_file, 'r', encoding='utf-8') as f:
|
||||
remote_text = f.read()
|
||||
|
||||
# Create test output folder
|
||||
test_folder = str(tmp_path / f"case{case_num}_local")
|
||||
|
||||
# Perform merge
|
||||
result = merge_playlists(
|
||||
base_text=base_text,
|
||||
local_text=local_text,
|
||||
remote_text=remote_text,
|
||||
strategy=ConflictResolutionStrategy.LOCAL_PRIORITY,
|
||||
test_folder=test_folder
|
||||
)
|
||||
|
||||
# Compare with expected result
|
||||
expected_file = TEST_RES_DIR / f"case{case_num}_merged_local_primary.m3u"
|
||||
actual_file = Path(test_folder) / "local_result.m3u8"
|
||||
|
||||
assert_playlists_equal(actual_file, expected_file)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
|
||||
class TestPlaylistMergeRemotePriority:
|
||||
"""Test playlist merge with remote priority strategy."""
|
||||
|
||||
def test_merge_remote_priority(self, case_num, tmp_path):
|
||||
"""Test case {case_num} with remote priority."""
|
||||
# Load input files
|
||||
base_file = TEST_CASE_DIR / "base" / f"case{case_num}.m3u"
|
||||
local_file = TEST_CASE_DIR / "local_playlist" / f"case{case_num}.m3u"
|
||||
remote_file = TEST_CASE_DIR / "remote_playlist" / f"case{case_num}.m3u"
|
||||
|
||||
if not base_file.exists():
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
if not (local_file.exists() and remote_file.exists()):
|
||||
pytest.skip("Test playlist fixtures are not available in this environment.")
|
||||
with open(base_file, 'r', encoding='utf-8') as f:
|
||||
base_text = f.read()
|
||||
with open(local_file, 'r', encoding='utf-8') as f:
|
||||
local_text = f.read()
|
||||
with open(remote_file, 'r', encoding='utf-8') as f:
|
||||
remote_text = f.read()
|
||||
|
||||
# Create test output folder
|
||||
test_folder = str(tmp_path / f"case{case_num}_remote")
|
||||
|
||||
# Perform merge
|
||||
result = merge_playlists(
|
||||
base_text=base_text,
|
||||
local_text=local_text,
|
||||
remote_text=remote_text,
|
||||
strategy=ConflictResolutionStrategy.REMOTE_PRIORITY,
|
||||
test_folder=test_folder
|
||||
)
|
||||
|
||||
# Compare with expected result
|
||||
expected_file = TEST_RES_DIR / f"case{case_num}_merged_remote_primary.m3u"
|
||||
actual_file = Path(test_folder) / "remote_result.m3u8"
|
||||
|
||||
assert_playlists_equal(actual_file, expected_file)
|
||||
|
||||
|
||||
class TestConflictDetection:
|
||||
"""Test conflict detection in merges."""
|
||||
|
||||
def test_conflict_reporting(self, tmp_path):
|
||||
"""Test that conflicts are properly reported."""
|
||||
base = "#EXTM3U\n/track1.mp3\n"
|
||||
local = "#EXTM3U\n/track2.mp3\n" # Changed
|
||||
remote = "#EXTM3U\n/track3.mp3\n" # Changed differently
|
||||
|
||||
result = merge_playlists(
|
||||
base_text=base,
|
||||
local_text=local,
|
||||
remote_text=remote,
|
||||
strategy=ConflictResolutionStrategy.LOCAL_PRIORITY,
|
||||
test_folder=str(tmp_path / "conflict_test")
|
||||
)
|
||||
|
||||
# Should detect conflict
|
||||
assert len(result.conflicts) > 0
|
||||
|
||||
# Local priority should prefer local version
|
||||
assert "/track2.mp3" in result.merged_paths
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Allow running tests directly
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user