import os import zipfile from datetime import datetime from typing import List from app.utils.logger import logger from app.utils.config import server_config from app.utils.local_playlist import load_local_playlist from app.utils.plex_client import plex_client # Default backup directory BACKUP_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..", "backups") ) def ensure_backup_dir(): """Ensure the backup directory exists.""" if not os.path.exists(BACKUP_DIR): os.makedirs(BACKUP_DIR, exist_ok=True) logger.info(f"Created backup directory: {BACKUP_DIR}") def get_timestamp() -> str: """Generate a timestamp string for backup filenames.""" return datetime.now().strftime("%Y%m%d_%H%M%S") def get_sorted_backup_files(prefix: str) -> List[str]: """Get backup files sorted by modification time (oldest first). Args: prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_') Returns: List of backup file paths sorted by modification time (oldest first) """ ensure_backup_dir() backup_files = [] for filename in os.listdir(BACKUP_DIR): if filename.startswith(prefix) and filename.endswith('.zip'): filepath = os.path.join(BACKUP_DIR, filename) backup_files.append(filepath) # Sort by modification time (oldest first) backup_files.sort(key=lambda x: os.path.getmtime(x)) return backup_files def cleanup_old_backups(prefix: str, retention_count: int): """Delete old backup files exceeding the retention count. Args: prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_') retention_count: Maximum number of backups to keep (0 means no auto-delete) """ if retention_count <= 0: logger.debug(f"Backup retention count is {retention_count}, skipping cleanup for {prefix}") return backup_files = get_sorted_backup_files(prefix) # Delete oldest files if we exceed retention count files_to_delete = len(backup_files) - retention_count if files_to_delete > 0: for filepath in backup_files[:files_to_delete]: try: os.remove(filepath) logger.info(f"Deleted old backup: {filepath}") except Exception as e: logger.warning(f"Failed to delete backup {filepath}: {e}") def backup_local_playlists(local_path: str) -> str | None: """Create a backup of local playlists. Reads all playlist files from the local path and writes them to a zip file without any modifications. Args: local_path: Path to the local playlist directory Returns: Path to the created backup file, or None if backup failed """ ensure_backup_dir() if not local_path or not os.path.isdir(local_path): logger.warning(f"Local path does not exist or is not a directory: {local_path}") return None timestamp = get_timestamp() backup_filename = f"local_backup_{timestamp}.zip" backup_path = os.path.join(BACKUP_DIR, backup_filename) try: playlist_count = 0 with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for entry in os.scandir(local_path): if not entry.is_file(): continue if not entry.name.lower().endswith((".m3u", ".m3u8")): continue # Read the original file content try: with open(entry.path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # Try with different encoding with open(entry.path, 'r', encoding='latin-1') as f: content = f.read() # Get the playlist name without extension and add .m3u8 extension playlist_name = os.path.splitext(entry.name)[0] archive_name = f"{playlist_name}.m3u8" # Write to zip zipf.writestr(archive_name, content) playlist_count += 1 if playlist_count == 0: # Remove empty zip file os.remove(backup_path) logger.info("No playlists found for local backup") return None logger.info(f"Created local backup with {playlist_count} playlists: {backup_path}") return backup_path except Exception as e: logger.error(f"Failed to create local backup: {e}") # Clean up partial backup file if it exists if os.path.exists(backup_path): try: os.remove(backup_path) except OSError: pass return None def backup_cloud_playlists(library_name: str) -> str | None: """Create a backup of cloud playlists. Fetches all playlists from the Plex server and writes them to a zip file without any modifications. Args: library_name: Name of the Plex library to backup playlists from Returns: Path to the created backup file, or None if backup failed """ ensure_backup_dir() if not plex_client.connected: logger.warning("Plex client not connected, cannot backup cloud playlists") return None if not library_name: logger.warning("No library name specified for cloud backup") return None timestamp = get_timestamp() backup_filename = f"cloud_backup_{timestamp}.zip" backup_path = os.path.join(BACKUP_DIR, backup_filename) try: playlists = plex_client.get_lib_playlists(library_name) if not playlists: logger.info("No playlists found for cloud backup") return None playlist_count = 0 with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for playlist in playlists: try: # Get playlist items items = playlist.items() # Build m3u8 content lines = ["#EXTM3U"] for item in items: # Try to get file path from the track try: if hasattr(item, 'media') and item.media: for media in item.media: if hasattr(media, 'parts') and media.parts: for part in media.parts: if hasattr(part, 'file') and part.file: # Add extended info if available duration = getattr(item, 'duration', 0) or 0 duration_seconds = duration // 1000 if duration else -1 title = getattr(item, 'title', 'Unknown') artist = '' if hasattr(item, 'grandparentTitle'): artist = item.grandparentTitle elif hasattr(item, 'artist'): artist_attr = getattr(item, 'artist') if callable(artist_attr): artist = str(artist_attr()) else: artist = str(artist_attr) extinf = f"#EXTINF:{duration_seconds},{artist} - {title}" lines.append(extinf) lines.append(part.file) break except Exception as e: logger.debug(f"Could not get file path for track: {e}") continue if len(lines) > 1: # More than just #EXTM3U content = "\n".join(lines) archive_name = f"{playlist.title}.m3u8" zipf.writestr(archive_name, content) playlist_count += 1 except Exception as e: logger.warning(f"Failed to backup playlist '{playlist.title}': {e}") continue if playlist_count == 0: # Remove empty zip file os.remove(backup_path) logger.info("No playlists with valid tracks found for cloud backup") return None logger.info(f"Created cloud backup with {playlist_count} playlists: {backup_path}") return backup_path except Exception as e: logger.error(f"Failed to create cloud backup: {e}") # Clean up partial backup file if it exists if os.path.exists(backup_path): try: os.remove(backup_path) except OSError: pass return None def perform_backup_before_sync(local_path: str, library_name: str): """Perform backup of both local and cloud playlists before sync. This function should be called before sync if backup is enabled. It also handles cleanup of old backups based on retention settings. Args: local_path: Path to the local playlist directory library_name: Name of the Plex library """ server_config.load() if not server_config.backup_enabled: logger.debug("Backup is disabled, skipping pre-sync backup") return logger.info("Starting pre-sync backup...") # Backup local playlists local_backup = backup_local_playlists(local_path) if local_backup: cleanup_old_backups("local_backup_", server_config.backup_retention_count) # Backup cloud playlists cloud_backup = backup_cloud_playlists(library_name) if cloud_backup: cleanup_old_backups("cloud_backup_", server_config.backup_retention_count) logger.info("Pre-sync backup completed")