feat: Implement playlist synchronization result writeback functionality.

This commit is contained in:
2025-12-05 23:08:50 +09:00
parent df4f5dde17
commit 588c84c2c8
3 changed files with 185 additions and 2 deletions
+45 -1
View File
@@ -65,4 +65,48 @@ def scan_local_playlists(base_path: str) -> list[dict]:
playlists.sort(key=lambda item: item["name"].lower())
logger.info(f"Found {len(playlists)} playlists under {absolute_path}.")
return playlists
return playlists
def write_local_playlist(playlist_path: str, tracks: List[str]) -> bool:
"""
Write a list of tracks to a local playlist file.
Args:
playlist_path (str): The path to the playlist file.
tracks (list): A list of songs to write to the playlist.
Returns:
bool: True if successful, False otherwise.
"""
try:
with open(playlist_path, 'w', encoding="utf-8") as file:
file.write("#EXTM3U\n")
for track in tracks:
file.write(f"{track}\n")
logger.info(f"Written {len(tracks)} songs to the playlist: {playlist_path}")
return True
except Exception as e:
logger.error(f"An error occurred while writing the playlist {playlist_path}: {e}")
return False
def delete_local_playlist(playlist_path: str) -> bool:
"""
Delete a local playlist file.
Args:
playlist_path (str): The path to the playlist file.
Returns:
bool: True if successful, False otherwise.
"""
try:
if os.path.exists(playlist_path):
os.remove(playlist_path)
logger.info(f"Deleted playlist: {playlist_path}")
return True
else:
logger.warning(f"Playlist not found for deletion: {playlist_path}")
return False
except Exception as e:
logger.error(f"An error occurred while deleting the playlist {playlist_path}: {e}")
return False
+90
View File
@@ -311,4 +311,94 @@ class PlexClient:
)
return local_2_plex
def get_playlist(self, title: str):
"""Get a playlist by title."""
self._connect_check()
try:
# Exact match search for playlist
playlists = self.server.playlists(title=title)
if playlists:
return playlists[0]
return None
except Exception as e:
logger.error(f"Error fetching playlist {title}: {e}")
return None
def create_playlist(self, title: str, items: list):
"""Create a new playlist with the given items."""
self._connect_check()
try:
self.server.createPlaylist(title, items=items)
logger.info(f"Created playlist {title} with {len(items)} items.")
return True
except Exception as e:
logger.error(f"Error creating playlist {title}: {e}")
return False
def delete_playlist(self, title: str):
"""Delete a playlist by title."""
self._connect_check()
try:
playlist = self.get_playlist(title)
if playlist:
playlist.delete()
logger.info(f"Deleted playlist {title}.")
return True
else:
logger.warning(f"Playlist {title} not found for deletion.")
return False
except Exception as e:
logger.error(f"Error deleting playlist {title}: {e}")
return False
def update_playlist(self, title: str, items: list):
"""
Update a playlist with a new list of items.
This implementation replaces the existing items with the new ones.
"""
self._connect_check()
try:
playlist = self.get_playlist(title)
if not playlist:
return self.create_playlist(title, items)
# Remove all items and add new ones
playlist.removeItems(playlist.items())
if items:
playlist.addItems(items)
logger.info(f"Updated playlist {title} with {len(items)} items.")
return True
except Exception as e:
logger.error(f"Error updating playlist {title}: {e}")
return False
def get_items_by_paths(self, library_name: str, paths: list[str]) -> list:
"""
Find Plex items (tracks) by their file paths.
"""
self._connect_check()
if not paths:
return []
try:
path_map = self.match_tracks(library_name, paths)
except FileNotFoundError:
logger.info(f"Cache not found for {library_name}, creating it...")
self.cache_lib_tracks(library_name)
path_map = self.match_tracks(library_name, paths)
items = []
for path in paths:
rating_key = path_map.get(path)
if rating_key and rating_key != UNMATCHED_TRACK_RATING_KEY:
try:
item = self.server.fetchItem(rating_key)
items.append(item)
except Exception as e:
logger.warning(f"Failed to fetch item for ratingKey {rating_key}: {e}")
else:
logger.warning(f"Track not found in Plex library (or unmatched): {path}")
return items
plex_client = PlexClient()
+50 -1
View File
@@ -1,10 +1,13 @@
import threading
import asyncio
import json
import os
from datetime import datetime
from app.utils.logger import logger
from app.utils.playlist_merge import sync_all_playlists, SyncMode
from app.utils.config import server_config
from app.utils.local_playlist import load_local_playlist, write_local_playlist, delete_local_playlist
from app.utils.plex_client import plex_client
class SyncManager:
def __init__(self):
@@ -111,7 +114,53 @@ class SyncManager:
kwargs.update(sync_kwargs)
# Execute sync
return sync_all_playlists(**kwargs)
results = sync_all_playlists(**kwargs)
# Apply results (write to local and remote)
self._apply_sync_results(results)
return results
def _apply_sync_results(self, results):
logger.info("Applying sync results to local and remote...")
for result in results:
playlist_name = result.name
action = result.action
output_dir = result.output_dir
try:
if action == "synced":
# 1. Write Local
local_result_path = os.path.join(output_dir, "local_result.m3u8")
if os.path.exists(local_result_path):
tracks = load_local_playlist(local_result_path)
dest_path = os.path.join(server_config.local_path, f"{playlist_name}.m3u8")
# Ensure directory exists
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
write_local_playlist(dest_path, tracks)
# 2. Write Remote (Plex)
remote_result_path = os.path.join(output_dir, "remote_result.m3u8")
if os.path.exists(remote_result_path):
tracks = load_local_playlist(remote_result_path)
if server_config.library_name:
items = plex_client.get_items_by_paths(server_config.library_name, tracks)
plex_client.update_playlist(playlist_name, items)
else:
logger.warning("Library name not configured, skipping Plex update.")
elif action == "deleted":
# Delete Local
dest_path = os.path.join(server_config.local_path, f"{playlist_name}.m3u8")
delete_local_playlist(dest_path)
# Also check for .m3u
dest_path_m3u = os.path.join(server_config.local_path, f"{playlist_name}.m3u")
delete_local_playlist(dest_path_m3u)
# Delete Remote
plex_client.delete_playlist(playlist_name)
except Exception as e:
logger.error(f"Error applying sync result for playlist {playlist_name}: {e}")
def _complete_sync(self, status, error=None):
with self._lock: