import threading import asyncio import json from datetime import datetime from app.utils.logger import logger from app.utils.playlist_merge import sync_all_playlists, SyncMode from app.utils.config import server_config class SyncManager: def __init__(self): self._lock = threading.Lock() self._is_syncing = False self._last_sync_time = None self._last_status = "idle" # idle, syncing, success, error self._last_error = None self._listeners = [] # List of asyncio.Queue self._loop = None def set_event_loop(self, loop): self._loop = loop async def subscribe(self): q = asyncio.Queue() self._listeners.append(q) # Send current status immediately await q.put(json.dumps(self.status)) return q def unsubscribe(self, q): if q in self._listeners: self._listeners.remove(q) def _notify_listeners(self): if not self._loop or not self._listeners: return status_json = json.dumps(self.status) for q in self._listeners: try: self._loop.call_soon_threadsafe(q.put_nowait, status_json) except Exception as e: logger.error(f"Error notifying listener: {e}") @property def is_syncing(self): with self._lock: return self._is_syncing @property def status(self): with self._lock: return { "is_syncing": self._is_syncing, "last_sync_time": self._last_sync_time.isoformat() if self._last_sync_time else None, "status": self._last_status, "error": str(self._last_error) if self._last_error else None } def run_sync(self, trigger_source="manual", wait=False, sync_kwargs=None): """ Thread-safe sync execution. If wait=True, blocks until sync completes and returns result. If wait=False, runs in background and returns True if started. """ with self._lock: if self._is_syncing: logger.warning(f"Sync requested ({trigger_source}) but already in progress.") if wait: raise Exception("Sync already in progress") return False self._is_syncing = True self._last_status = "syncing" self._last_error = None self._notify_listeners() logger.info(f"Starting sync (Source: {trigger_source})...") if wait: try: result = self._perform_sync(sync_kwargs) self._complete_sync("success") return result except Exception as e: self._complete_sync("error", e) raise e else: thread = threading.Thread(target=self._sync_worker, args=(trigger_source, sync_kwargs)) thread.start() return True def _sync_worker(self, trigger_source, sync_kwargs=None): try: self._perform_sync(sync_kwargs) self._complete_sync("success") logger.info(f"Sync completed successfully (Source: {trigger_source}).") except Exception as e: logger.error(f"Sync failed (Source: {trigger_source}): {e}") self._complete_sync("error", e) def _perform_sync(self, sync_kwargs=None): # Reload config to ensure latest values server_config.load() kwargs = { "local_dir": server_config.local_path, "mode": SyncMode(server_config.sync_mode) } if sync_kwargs: kwargs.update(sync_kwargs) # Execute sync return sync_all_playlists(**kwargs) def _complete_sync(self, status, error=None): with self._lock: self._last_status = status self._last_error = error self._last_sync_time = datetime.now() self._is_syncing = False self._notify_listeners() sync_manager = SyncManager()