6 Commits

11 changed files with 123 additions and 647 deletions
-245
View File
@@ -1,245 +0,0 @@
# 测试框架使用指南
## 概述
项目已配置 **pytest** 测试框架,相比你之前的手动测试方法,有以下优势:
### ✅ 改进点
| 旧方法 (Jupyter Notebook) | 新方法 (pytest) |
|-------------------------|----------------|
| 手动运行每个测试用例 | 自动发现和运行所有测试 |
| 手动读取和对比文件 | 自动化断言和差异报告 |
| 难以定位失败原因 | 清晰的错误堆栈和 diff |
| 无法批量运行 | 支持参数化和并行测试 |
| 无覆盖率统计 | 自动生成覆盖率报告 |
## 快速开始
### 1. 运行所有测试
```bash
python -m pytest
```
### 2. 运行特定测试文件
```bash
python -m pytest tests/test_playlist_merge.py -v
```
### 3. 在 VS Code 中运行
-`Ctrl+Shift+P`
- 输入 "Run Task"
- 选择 "运行所有测试" 或其他测试任务
## 测试文件说明
### 已创建的测试文件
```
tests/
├── __init__.py # 包初始化
├── conftest.py # pytest fixtures 和配置
├── test_playlist_merge.py # 播放列表合并测试
└── README.md # 详细测试指南
```
### 测试用例覆盖
`test_playlist_merge.py` 包含:
1. **基础功能测试** (`TestPlaylistMergeBasics`)
- `test_load_paths_removes_comments` - 测试路径加载
- `test_save_paths_adds_header` - 测试文件保存
- `test_empty_playlist_merge` - 测试空播放列表合并
2. **本地优先测试** (`TestPlaylistMergeLocalPriority`)
- 参数化测试 case1-4 的本地优先合并
3. **远程优先测试** (`TestPlaylistMergeRemotePriority`)
- 参数化测试 case1-4 的远程优先合并
4. **冲突检测测试** (`TestConflictDetection`)
- 测试冲突识别和解决
## 常用命令
### 基础运行
```bash
# 运行所有测试
pytest
# 详细输出
pytest -v
# 显示 print 输出
pytest -s
# 只运行特定测试
pytest tests/test_playlist_merge.py::TestPlaylistMergeBasics::test_load_paths_removes_comments
```
### 测试筛选
```bash
# 运行特定 case
pytest -k "case_num-1"
# 运行包含特定关键字的测试
pytest -k "local_priority"
# 运行标记的测试
pytest -m unit
```
### 调试和重试
```bash
# 失败时进入调试器
pytest --pdb
# 只运行上次失败的测试
pytest --lf
# 先运行失败的测试
pytest --ff
# 遇到第一个失败就停止
pytest -x
```
### 覆盖率报告
```bash
# 生成覆盖率报告
pytest --cov=app --cov-report=html
# 在浏览器中查看
# 打开 htmlcov/index.html
# 终端中显示覆盖率
pytest --cov=app --cov-report=term
```
## 编写新测试
### 1. 创建测试文件
`tests/` 目录下创建 `test_<feature>.py`
### 2. 编写测试类和函数
```python
import pytest
class TestMyFeature:
"""测试我的功能"""
def test_basic_case(self):
"""测试基本用例"""
result = my_function()
assert result == expected_value
@pytest.mark.parametrize("input,expected", [
(1, 2),
(2, 4),
(3, 6),
])
def test_multiple_cases(self, input, expected):
"""参数化测试多个用例"""
assert my_function(input) == expected
```
### 3. 使用 fixtures
```python
@pytest.fixture
def sample_data():
"""提供测试数据"""
return {"key": "value"}
def test_with_fixture(sample_data):
assert sample_data["key"] == "value"
```
## 与旧测试方法的对比
### 旧方法 (test.ipynb)
```python
# 手动循环和对比
for i in range(len(test_playlists[1])):
with open(test_playlists[1][i], 'r', encoding='utf-8') as f1, \
open(target_playlists[i], 'r', encoding='utf-8') as f2:
content1 = f1.read()
content2 = f2.read()
# 手动处理和对比...
if content1 == content2:
print(f"Test case {i+1} passed.")
else:
print(f"Test case {i+1} failed.")
```
### 新方法 (pytest)
```python
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
def test_merge_local_priority(case_num, tmp_path):
"""自动运行所有用例,失败时显示详细 diff"""
# 加载输入
base_text = load_file(f"case{case_num}.m3u")
# ... 执行测试
result = merge_playlists(base_text, local_text, remote_text)
# 自动断言和差异报告
assert_playlists_equal(actual_file, expected_file)
```
## 持续集成
可以将测试集成到 CI/CD 流程:
```yaml
# .github/workflows/test.yml (示例)
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: pytest --cov=app
```
## 最佳实践
1. **测试命名** - 使用清晰描述性的名称
2. **独立性** - 每个测试应该独立运行
3. **快速** - 单元测试应该快速执行
4. **明确** - 一个测试只测一个功能点
5. **维护** - 定期更新测试用例
## 故障排除
### pytest 找不到
```bash
python -m pip install pytest pytest-cov
```
### 导入错误
确保项目根目录在 Python 路径中:
```python
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
```
### 测试发现失败
检查 `pytest.ini` 配置和文件命名是否符合规范。
## 下一步
- [ ] 为其他模块添加测试 (如 `plex_client.py`, `local_playlist.py`)
- [ ] 添加集成测试
- [ ] 配置 CI/CD 自动运行测试
- [ ] 设置测试覆盖率目标 (如 >80%)
## 参考资料
- [pytest 官方文档](https://docs.pytest.org/)
- [pytest-cov 文档](https://pytest-cov.readthedocs.io/)
- 项目内测试: `tests/README.md`
+74 -7
View File
@@ -1,5 +1,4 @@
import os
from typing import Tuple
from app.utils.config import server_config
from app.utils.playlist_merge import SyncMode, sync_all_playlists, TEST_PLAYLIST_DIR
from fastapi import FastAPI, Request, Form
@@ -48,7 +47,7 @@ SYNC_MODE_OPTIONS = [
]
def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
def _get_cloud_playlists() -> tuple[list[dict], str, dict, str, list[str]]:
"""Fetch playlists and connection state from the remote Plex server."""
server_config.load()
@@ -58,10 +57,12 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
"name": "未设置",
"domain": server_config.url or "未设置",
}
selected_library = server_config.library_name
music_libraries: list[str] = []
# no server url configured
if not server_config.url:
return playlists, status, server_info
return playlists, status, server_info, selected_library, music_libraries
status = "failed"
try:
@@ -79,7 +80,16 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
}
)
for playlist in plex_client.server.playlists():
music_libraries = plex_client.get_libs_name_list()
if not music_libraries:
server_config.set_and_save_config(library_name="")
return playlists, status, server_info, "", music_libraries
if not selected_library or selected_library not in music_libraries:
selected_library = music_libraries[0]
server_config.set_and_save_config(library_name=selected_library)
for playlist in plex_client.get_lib_playlists(selected_library) or []:
track_count = getattr(playlist, "itemCount", None)
if track_count is None:
try:
@@ -97,7 +107,8 @@ def _get_cloud_playlists() -> Tuple[list[dict], str, dict]:
status = "failed"
playlists.sort(key=lambda item: item["name"].lower())
return playlists, status, server_info
return playlists, status, server_info, selected_library, music_libraries
def _build_home_context(
request: Request,
@@ -109,7 +120,13 @@ def _build_home_context(
):
server_config.load()
local_playlists = scan_local_playlists(local_path)
cloud_playlists, connection_status, server_info = _get_cloud_playlists()
(
cloud_playlists,
connection_status,
server_info,
selected_library,
music_libraries,
) = _get_cloud_playlists()
return {
"request": request,
@@ -120,6 +137,8 @@ def _build_home_context(
"cloud_playlists": cloud_playlists,
"connection_status": connection_status,
"server_info": server_info,
"selected_library": selected_library,
"music_libraries": music_libraries,
"sync_modes": SYNC_MODE_OPTIONS,
"selected_mode": selected_mode,
"message": message,
@@ -225,6 +244,25 @@ async def save_path_rules(
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
server_config.load()
music_libraries = []
selected_library = server_config.library_name
if server_config.url and server_config.token:
try:
plex_client.connect(
token=server_config.token,
scheme=server_config.scheme,
url=server_config.url,
port=server_config.port,
)
music_libraries = plex_client.get_libs_name_list()
if music_libraries:
if selected_library not in music_libraries:
selected_library = music_libraries[0]
else:
selected_library = ""
except Exception:
selected_library = ""
music_libraries = []
return templates.TemplateResponse(
"login.html",
{
@@ -235,6 +273,8 @@ async def login_page(request: Request):
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
"music_libraries": music_libraries,
"selected_library": selected_library,
},
)
@@ -248,6 +288,7 @@ async def login(
scheme: str = Form("https"),
url: str = Form(...),
port: str = Form("32400"),
library_name: str = Form(""),
):
# 尝试连接到 Plex 服务器
try:
@@ -261,9 +302,30 @@ async def login(
port=port,
)
# 成功连接后保存配置到配置文件
music_libraries: list[str] = []
selected_library = ""
if plex_client.connected:
try:
music_libraries = plex_client.get_libs_name_list()
except Exception as exc:
logger.warning(f"Unable to fetch music libraries: {exc}")
music_libraries = []
if music_libraries:
if library_name and library_name in music_libraries:
selected_library = library_name
else:
selected_library = music_libraries[0]
server_config.set_and_save_config(
token=token_success, scheme=scheme, url=url, port=port
token=token_success,
scheme=scheme,
url=url,
port=port,
library_name=selected_library,
)
else:
music_libraries = []
server_config.set_and_save_config(
token=token_success, scheme=scheme, url=url, port=port, library_name=""
)
return templates.TemplateResponse(
"login.html",
@@ -277,9 +339,12 @@ async def login(
"scheme": server_config.scheme,
"server_url": server_config.url,
"port": server_config.port,
"music_libraries": music_libraries,
"selected_library": selected_library,
},
)
except Exception as e:
music_libraries = []
return templates.TemplateResponse(
"login.html",
{
@@ -291,6 +356,8 @@ async def login(
"scheme": scheme,
"server_url": url,
"port": port,
"music_libraries": music_libraries,
"selected_library": "",
},
)
+22 -8
View File
@@ -107,6 +107,7 @@
<div>
<h5 class="card-title mb-0">云端播放列表</h5>
<small class="text-body-secondary">来自已连接的 Plex 服务器</small>
<div class="text-body-secondary small mt-1">当前音乐库:{{ selected_library or '暂无可用音乐库' }}</div>
</div>
<form class="ms-lg-auto" method="get" action="/">
<input type="hidden" name="local_path" value="{{ local_path }}">
@@ -163,7 +164,11 @@
</button>
<div class="text-body-secondary small">从上到下依次匹配,左侧填写正则表达式,右侧填写替换结果。</div>
</div>
<div class="d-flex justify-content-end">
<div class="d-flex justify-content-end gap-2">
<button class="btn btn-outline-secondary" type="button" id="cancelRulesBtn">
<i class="bi bi-arrow-counterclockwise"></i>
<span class="ms-1">取消变更</span>
</button>
<button class="btn btn-success" type="submit">
<i class="bi bi-save"></i>
<span class="ms-1">保存规则</span>
@@ -189,8 +194,13 @@
const existingRules = {{ path_rules | tojson | safe }};
const ruleList = document.getElementById('ruleList');
const addRuleBtn = document.getElementById('addRuleBtn');
const cancelRulesBtn = document.getElementById('cancelRulesBtn');
const template = document.getElementById('ruleRowTemplate');
const initialRules = existingRules.length
? existingRules.map(rule => ({ ...rule }))
: [{ pattern: '', replacement: '' }];
function bindRemoveButton(row) {
const removeBtn = row.querySelector('button');
removeBtn.addEventListener('click', () => {
@@ -208,14 +218,18 @@
ruleList.appendChild(clone);
}
addRuleBtn.addEventListener('click', () => addRuleRow());
if (existingRules.length) {
existingRules.forEach(rule => addRuleRow(rule.pattern || '', rule.replacement || ''));
}
if (!ruleList.children.length) {
function renderRules(rules) {
ruleList.innerHTML = '';
if (!rules.length) {
addRuleRow();
return;
}
rules.forEach(rule => addRuleRow(rule.pattern || '', rule.replacement || ''));
}
addRuleBtn.addEventListener('click', () => addRuleRow());
cancelRulesBtn.addEventListener('click', () => renderRules(initialRules));
renderRules(initialRules);
</script>
{% endblock %}
+13
View File
@@ -32,6 +32,19 @@
<label for="port" class="form-label">端口</label>
<input type="text" class="form-control" id="port" name="port" value="{{ port }}">
</div>
<div class="mb-3">
<label for="library_name" class="form-label">音乐库</label>
<select class="form-select" id="library_name" name="library_name" {% if not music_libraries %}disabled{% endif %}>
{% if music_libraries %}
{% for library in music_libraries %}
<option value="{{ library }}" {% if selected_library == library %}selected{% endif %}>{{ library }}</option>
{% endfor %}
{% else %}
<option value="" selected>暂无可用的音乐库</option>
{% endif %}
</select>
<div class="form-text">仅显示音乐类型的库,可在连接成功后自动刷新。</div>
</div>
<button type="submit" class="btn btn-primary">连接</button>
</form>
+9
View File
@@ -15,6 +15,7 @@ class ServerConfig:
self.url = ""
self.scheme = "https"
self.port = "32400"
self.library_name = ""
self.path_rules: list[dict[str, str]] = []
self.load()
@@ -38,6 +39,7 @@ class ServerConfig:
self.url = config.get("server_url", "")
self.scheme = config.get("server_scheme", "https")
self.port = config.get("server_port", "32400")
self.library_name = config.get("library_name", "")
self.path_rules = config.get("path_rules", []) or []
logger.info(f"Server config loaded: {self.__dict__}")
@@ -48,6 +50,7 @@ class ServerConfig:
"server_url": self.url,
"server_scheme": self.scheme,
"server_port": self.port,
"library_name": self.library_name,
"path_rules": self.path_rules,
}
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
@@ -66,6 +69,9 @@ class ServerConfig:
def set_token(self, token: str) -> None:
self.token = token
def set_library(self, library_name: str) -> None:
self.library_name = library_name or ""
def set_theme(self, theme: str) -> None:
# check theme is valid
if theme not in ["auto", "dark", "light"]:
@@ -83,6 +89,7 @@ class ServerConfig:
url: str = None,
scheme: str = None,
port: str = None,
library_name: str | None = None,
path_rules: list[dict[str, str]] | None = None,
) -> None:
if theme is not None:
@@ -95,6 +102,8 @@ class ServerConfig:
self.set_scheme(scheme)
if port is not None:
self.set_port(port)
if library_name is not None:
self.set_library(library_name)
if path_rules is not None:
self.set_path_rules(path_rules)
self.save()
+5 -2
View File
@@ -137,8 +137,11 @@ class PlexClient:
self._connect_check()
try:
libraries = self.server.library.sections()
lib_names = [lib.title for lib in libraries]
logger.info(f"Fetched {len(lib_names)} library names from the server.")
music_libs = [lib for lib in libraries if getattr(lib, "type", None) == MUSIC_LIBRARY_TYPE]
lib_names = [lib.title for lib in music_libs]
logger.info(
f"Fetched {len(lib_names)} music library names from the server (type='{MUSIC_LIBRARY_TYPE}')."
)
return lib_names
except Exception as e:
logger.warning(f"Failed to fetch library names: {str(e)}")
-26
View File
@@ -1,26 +0,0 @@
[pytest]
# Pytest configuration
# Test discovery patterns
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# Test paths
testpaths = tests
# Output options
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
# Markers for categorizing tests
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
# Minimum version
minversion = 6.0
-88
View File
@@ -1,88 +0,0 @@
# 测试指南
## 运行测试
### 运行所有测试
```bash
pytest
```
### 运行特定测试文件
```bash
pytest tests/test_playlist_merge.py
```
### 运行特定测试类或函数
```bash
pytest tests/test_playlist_merge.py::TestPlaylistMergeLocalPriority
pytest tests/test_playlist_merge.py::TestPlaylistMergeLocalPriority::test_merge_local_priority
```
### 运行特定参数化测试用例
```bash
# 只运行 case1 的测试
pytest tests/test_playlist_merge.py -k "case_num-1"
```
### 查看详细输出
```bash
pytest -v # verbose
pytest -vv # more verbose
pytest -s # 显示 print 输出
```
### 查看测试覆盖率
```bash
pip install pytest-cov
pytest --cov=app --cov-report=html
```
然后在浏览器中打开 `htmlcov/index.html`
## 测试结构
- `test_playlist_merge.py` - 播放列表合并功能的单元测试
- `conftest.py` - pytest fixtures 和配置
- `pytest.ini` - pytest 配置文件
## 编写新测试
1.`tests/` 目录下创建 `test_*.py` 文件
2. 创建以 `Test` 开头的测试类
3. 编写以 `test_` 开头的测试函数
4. 使用 `assert` 进行断言
5. 使用 `@pytest.mark.parametrize` 进行参数化测试
示例:
```python
import pytest
class TestMyFeature:
def test_basic_functionality(self):
result = my_function(input_data)
assert result == expected_output
@pytest.mark.parametrize("input,expected", [
(1, 2),
(2, 4),
(3, 6),
])
def test_with_multiple_inputs(self, input, expected):
assert my_function(input) == expected
```
## 调试测试
### 在测试失败时进入调试器
```bash
pytest --pdb
```
### 只运行上次失败的测试
```bash
pytest --lf
```
### 先运行失败的测试
```bash
pytest --ff
```
-1
View File
@@ -1 +0,0 @@
# Test package initialization
-45
View File
@@ -1,45 +0,0 @@
"""
Pytest configuration and shared fixtures.
"""
import os
import shutil
from pathlib import Path
import pytest
@pytest.fixture(scope="session")
def project_root():
"""Return the project root directory."""
return Path(__file__).parent.parent
@pytest.fixture(scope="session")
def test_data_dir(project_root):
"""Return the test data directory."""
return project_root / "test_case"
@pytest.fixture(scope="session")
def expected_results_dir(project_root):
"""Return the expected results directory."""
return project_root / "test_res"
@pytest.fixture
def temp_test_dir(tmp_path):
"""Create a temporary directory for test output."""
test_dir = tmp_path / "test_output"
test_dir.mkdir(exist_ok=True)
yield test_dir
# Cleanup after test (optional)
# shutil.rmtree(test_dir, ignore_errors=True)
@pytest.fixture(autouse=True)
def reset_test_state():
"""Reset any global state before each test."""
# Add any necessary cleanup here
yield
# Post-test cleanup
-225
View File
@@ -1,225 +0,0 @@
"""
Unit tests for playlist merge functionality.
"""
import os
from pathlib import Path
import pytest
from app.utils.playlist_merge import (
merge_playlists,
ConflictResolutionStrategy,
load_paths,
save_paths,
preprocess_playlist_text,
)
# Test data directory
TEST_CASE_DIR = Path(__file__).parent.parent / "test_case"
TEST_RES_DIR = Path(__file__).parent.parent / "test_res"
DOCKERAPP_TEST_DIR = Path(__file__).parent.parent / "dockerapp" / "test_playlists"
def normalize_playlist_content(file_path: str | Path) -> list[str]:
"""
Read a playlist file and return normalized content (without comments/blanks).
Args:
file_path: Path to the playlist file
Returns:
List of track paths (non-empty, non-comment lines)
"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Remove blank lines and comments
lines = [
line.strip()
for line in content.splitlines()
if line.strip() and not line.startswith('#')
]
return lines
def assert_playlists_equal(actual_file: str | Path, expected_file: str | Path):
"""
Assert that two playlist files have identical content (ignoring comments).
Args:
actual_file: Path to the generated playlist
expected_file: Path to the expected playlist
Raises:
AssertionError: If playlists differ
"""
actual_content = normalize_playlist_content(actual_file)
expected_content = normalize_playlist_content(expected_file)
if actual_content != expected_content:
# Provide detailed diff on failure
import difflib
diff = difflib.unified_diff(
expected_content,
actual_content,
fromfile=str(expected_file),
tofile=str(actual_file),
lineterm=''
)
diff_text = '\n'.join(diff)
pytest.fail(f"Playlist files differ:\n{diff_text}")
class TestPlaylistMergeBasics:
"""Test basic playlist merge operations."""
def test_load_paths_removes_comments(self):
"""Test that load_paths correctly filters comments and blanks."""
text = "#EXTM3U\n\n/path/to/track1.mp3\n#Comment\n/path/to/track2.mp3\n"
paths = load_paths(text)
assert paths == ["/path/to/track1.mp3", "/path/to/track2.mp3"]
def test_save_paths_adds_header(self):
"""Test that save_paths adds proper m3u header."""
paths = ["/path/to/track1.mp3", "/path/to/track2.mp3"]
text = save_paths(paths)
assert text.startswith("#EXTM3U\n")
assert "/path/to/track1.mp3" in text
assert "/path/to/track2.mp3" in text
def test_empty_playlist_merge(self):
"""Test merging empty playlists."""
result = merge_playlists("", "", "", test_folder=str(DOCKERAPP_TEST_DIR / "empty_test"))
assert result.merged_paths == []
assert result.conflicts == []
def test_preprocess_playlist_text(self):
"""Ensure regex replacements run in order."""
raw = """#EXTM3U
\\\\koha9-nas\\koha9-nas\\Music\\Album\\01.flac
/music/cache/temp.flac
"""
rules = [
{"pattern": r"\\\\koha9-nas\\koha9-nas\\Music", "replacement": r"N:\\Music"},
{"pattern": r"/music/cache/", "replacement": "/data/music/"},
]
processed = preprocess_playlist_text(raw, rules)
lines = [line for line in processed.splitlines() if line and not line.startswith("#")]
assert lines[0] == r"N:\Music\Album\01.flac"
assert lines[1] == "/data/music/temp.flac"
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
class TestPlaylistMergeLocalPriority:
"""Test playlist merge with local priority strategy."""
def test_merge_local_priority(self, case_num, tmp_path):
"""Test case {case_num} with local priority."""
# Load input files
base_file = TEST_CASE_DIR / "base" / f"case{case_num}.m3u"
local_file = TEST_CASE_DIR / "local_playlist" / f"case{case_num}.m3u"
remote_file = TEST_CASE_DIR / "remote_playlist" / f"case{case_num}.m3u"
if not base_file.exists():
pytest.skip("Test playlist fixtures are not available in this environment.")
if not (local_file.exists() and remote_file.exists()):
pytest.skip("Test playlist fixtures are not available in this environment.")
with open(base_file, 'r', encoding='utf-8') as f:
base_text = f.read()
with open(local_file, 'r', encoding='utf-8') as f:
local_text = f.read()
with open(remote_file, 'r', encoding='utf-8') as f:
remote_text = f.read()
# Create test output folder
test_folder = str(tmp_path / f"case{case_num}_local")
# Perform merge
result = merge_playlists(
base_text=base_text,
local_text=local_text,
remote_text=remote_text,
strategy=ConflictResolutionStrategy.LOCAL_PRIORITY,
test_folder=test_folder
)
# Compare with expected result
expected_file = TEST_RES_DIR / f"case{case_num}_merged_local_primary.m3u"
actual_file = Path(test_folder) / "local_result.m3u8"
assert_playlists_equal(actual_file, expected_file)
@pytest.mark.parametrize("case_num", [1, 2, 3, 4])
class TestPlaylistMergeRemotePriority:
"""Test playlist merge with remote priority strategy."""
def test_merge_remote_priority(self, case_num, tmp_path):
"""Test case {case_num} with remote priority."""
# Load input files
base_file = TEST_CASE_DIR / "base" / f"case{case_num}.m3u"
local_file = TEST_CASE_DIR / "local_playlist" / f"case{case_num}.m3u"
remote_file = TEST_CASE_DIR / "remote_playlist" / f"case{case_num}.m3u"
if not base_file.exists():
pytest.skip("Test playlist fixtures are not available in this environment.")
if not (local_file.exists() and remote_file.exists()):
pytest.skip("Test playlist fixtures are not available in this environment.")
with open(base_file, 'r', encoding='utf-8') as f:
base_text = f.read()
with open(local_file, 'r', encoding='utf-8') as f:
local_text = f.read()
with open(remote_file, 'r', encoding='utf-8') as f:
remote_text = f.read()
# Create test output folder
test_folder = str(tmp_path / f"case{case_num}_remote")
# Perform merge
result = merge_playlists(
base_text=base_text,
local_text=local_text,
remote_text=remote_text,
strategy=ConflictResolutionStrategy.REMOTE_PRIORITY,
test_folder=test_folder
)
# Compare with expected result
expected_file = TEST_RES_DIR / f"case{case_num}_merged_remote_primary.m3u"
actual_file = Path(test_folder) / "remote_result.m3u8"
assert_playlists_equal(actual_file, expected_file)
class TestConflictDetection:
"""Test conflict detection in merges."""
def test_conflict_reporting(self, tmp_path):
"""Test that conflicts are properly reported."""
base = "#EXTM3U\n/track1.mp3\n"
local = "#EXTM3U\n/track2.mp3\n" # Changed
remote = "#EXTM3U\n/track3.mp3\n" # Changed differently
result = merge_playlists(
base_text=base,
local_text=local,
remote_text=remote,
strategy=ConflictResolutionStrategy.LOCAL_PRIORITY,
test_folder=str(tmp_path / "conflict_test")
)
# Should detect conflict
assert len(result.conflicts) > 0
# Local priority should prefer local version
assert "/track2.mp3" in result.merged_paths
if __name__ == "__main__":
# Allow running tests directly
pytest.main([__file__, "-v"])