179 lines
6.7 KiB
Python
179 lines
6.7 KiB
Python
import threading
|
|
import asyncio
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from app.utils.logger import logger
|
|
from app.utils.playlist_merge import sync_all_playlists, SyncMode
|
|
from app.utils.config import server_config
|
|
from app.utils.backup import perform_backup_before_sync
|
|
from app.utils.local_playlist import load_local_playlist, write_local_playlist, delete_local_playlist
|
|
from app.utils.plex_client import plex_client
|
|
|
|
class SyncManager:
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
self._is_syncing = False
|
|
self._last_sync_time = None
|
|
self._last_status = "idle" # idle, syncing, success, error
|
|
self._last_error = None
|
|
self._listeners = [] # List of asyncio.Queue
|
|
self._loop = None
|
|
|
|
def set_event_loop(self, loop):
|
|
self._loop = loop
|
|
|
|
async def subscribe(self):
|
|
q = asyncio.Queue()
|
|
self._listeners.append(q)
|
|
# Send current status immediately
|
|
await q.put(json.dumps(self.status))
|
|
return q
|
|
|
|
def unsubscribe(self, q):
|
|
if q in self._listeners:
|
|
self._listeners.remove(q)
|
|
|
|
def _notify_listeners(self):
|
|
if not self._loop or not self._listeners:
|
|
return
|
|
|
|
status_json = json.dumps(self.status)
|
|
|
|
for q in self._listeners:
|
|
try:
|
|
self._loop.call_soon_threadsafe(q.put_nowait, status_json)
|
|
except Exception as e:
|
|
logger.error(f"Error notifying listener: {e}")
|
|
|
|
@property
|
|
def is_syncing(self):
|
|
with self._lock:
|
|
return self._is_syncing
|
|
|
|
@property
|
|
def status(self):
|
|
with self._lock:
|
|
return {
|
|
"is_syncing": self._is_syncing,
|
|
"last_sync_time": self._last_sync_time.isoformat() if self._last_sync_time else None,
|
|
"status": self._last_status,
|
|
"error": str(self._last_error) if self._last_error else None
|
|
}
|
|
|
|
def run_sync(self, trigger_source="manual", wait=False, sync_kwargs=None):
|
|
"""
|
|
Thread-safe sync execution.
|
|
If wait=True, blocks until sync completes and returns result.
|
|
If wait=False, runs in background and returns True if started.
|
|
"""
|
|
with self._lock:
|
|
if self._is_syncing:
|
|
logger.warning(f"Sync requested ({trigger_source}) but already in progress.")
|
|
if wait:
|
|
raise Exception("Sync already in progress")
|
|
return False
|
|
self._is_syncing = True
|
|
self._last_status = "syncing"
|
|
self._last_error = None
|
|
|
|
self._notify_listeners()
|
|
logger.info(f"Starting sync (Source: {trigger_source})...")
|
|
|
|
if wait:
|
|
try:
|
|
result = self._perform_sync(sync_kwargs)
|
|
self._complete_sync("success")
|
|
return result
|
|
except Exception as e:
|
|
self._complete_sync("error", e)
|
|
raise e
|
|
else:
|
|
thread = threading.Thread(target=self._sync_worker, args=(trigger_source, sync_kwargs))
|
|
thread.start()
|
|
return True
|
|
|
|
def _sync_worker(self, trigger_source, sync_kwargs=None):
|
|
try:
|
|
self._perform_sync(sync_kwargs)
|
|
self._complete_sync("success")
|
|
logger.info(f"Sync completed successfully (Source: {trigger_source}).")
|
|
except Exception as e:
|
|
logger.error(f"Sync failed (Source: {trigger_source}): {e}")
|
|
self._complete_sync("error", e)
|
|
|
|
def _perform_sync(self, sync_kwargs=None):
|
|
# Reload config to ensure latest values
|
|
server_config.load()
|
|
|
|
kwargs = {
|
|
"local_dir": server_config.local_path,
|
|
"mode": SyncMode(server_config.sync_mode)
|
|
}
|
|
|
|
if sync_kwargs:
|
|
kwargs.update(sync_kwargs)
|
|
|
|
# Perform backup before sync if enabled
|
|
local_dir = kwargs.get("local_dir", server_config.local_path)
|
|
perform_backup_before_sync(local_dir, server_config.library_name)
|
|
|
|
# Execute sync
|
|
results = sync_all_playlists(**kwargs)
|
|
|
|
# Apply results (write to local and remote)
|
|
self._apply_sync_results(results)
|
|
|
|
return results
|
|
|
|
def _apply_sync_results(self, results):
|
|
logger.info("Applying sync results to local and remote...")
|
|
for result in results:
|
|
playlist_name = result.name
|
|
action = result.action
|
|
output_dir = result.output_dir
|
|
|
|
try:
|
|
if action == "synced":
|
|
# 1. Write Local
|
|
local_result_path = os.path.join(output_dir, "local_result.m3u8")
|
|
if os.path.exists(local_result_path):
|
|
tracks = load_local_playlist(local_result_path)
|
|
dest_path = os.path.join(server_config.local_path, f"{playlist_name}.m3u8")
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
write_local_playlist(dest_path, tracks)
|
|
|
|
# 2. Write Remote (Plex)
|
|
remote_result_path = os.path.join(output_dir, "remote_result.m3u8")
|
|
if os.path.exists(remote_result_path):
|
|
tracks = load_local_playlist(remote_result_path)
|
|
if server_config.library_name:
|
|
items = plex_client.get_items_by_paths(server_config.library_name, tracks)
|
|
plex_client.update_playlist(playlist_name, items)
|
|
else:
|
|
logger.warning("Library name not configured, skipping Plex update.")
|
|
|
|
elif action == "deleted":
|
|
# Delete Local
|
|
dest_path = os.path.join(server_config.local_path, f"{playlist_name}.m3u8")
|
|
delete_local_playlist(dest_path)
|
|
# Also check for .m3u
|
|
dest_path_m3u = os.path.join(server_config.local_path, f"{playlist_name}.m3u")
|
|
delete_local_playlist(dest_path_m3u)
|
|
|
|
# Delete Remote
|
|
plex_client.delete_playlist(playlist_name)
|
|
except Exception as e:
|
|
logger.error(f"Error applying sync result for playlist {playlist_name}: {e}")
|
|
|
|
def _complete_sync(self, status, error=None):
|
|
with self._lock:
|
|
self._last_status = status
|
|
self._last_error = error
|
|
self._last_sync_time = datetime.now()
|
|
self._is_syncing = False
|
|
self._notify_listeners()
|
|
|
|
sync_manager = SyncManager()
|