Implement backup functionality with UI and backend support

Co-authored-by: Koha9 <36852125+Koha9@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-04 23:21:26 +00:00
parent 7b14445387
commit b483edae74
7 changed files with 489 additions and 7 deletions
+270
View File
@@ -0,0 +1,270 @@
import os
import zipfile
from datetime import datetime
from typing import List
from app.utils.logger import logger
from app.utils.config import server_config
from app.utils.local_playlist import load_local_playlist
from app.utils.plex_client import plex_client
# Default backup directory
BACKUP_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "backups")
)
def ensure_backup_dir():
"""Ensure the backup directory exists."""
if not os.path.exists(BACKUP_DIR):
os.makedirs(BACKUP_DIR, exist_ok=True)
logger.info(f"Created backup directory: {BACKUP_DIR}")
def get_timestamp() -> str:
"""Generate a timestamp string for backup filenames."""
return datetime.now().strftime("%Y%m%d_%H%M%S")
def get_sorted_backup_files(prefix: str) -> List[str]:
"""Get backup files sorted by modification time (oldest first).
Args:
prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_')
Returns:
List of backup file paths sorted by modification time (oldest first)
"""
ensure_backup_dir()
backup_files = []
for filename in os.listdir(BACKUP_DIR):
if filename.startswith(prefix) and filename.endswith('.zip'):
filepath = os.path.join(BACKUP_DIR, filename)
backup_files.append(filepath)
# Sort by modification time (oldest first)
backup_files.sort(key=lambda x: os.path.getmtime(x))
return backup_files
def cleanup_old_backups(prefix: str, retention_count: int):
"""Delete old backup files exceeding the retention count.
Args:
prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_')
retention_count: Maximum number of backups to keep (0 means no auto-delete)
"""
if retention_count <= 0:
logger.debug(f"Backup retention count is {retention_count}, skipping cleanup for {prefix}")
return
backup_files = get_sorted_backup_files(prefix)
# Delete oldest files if we exceed retention count
files_to_delete = len(backup_files) - retention_count
if files_to_delete > 0:
for filepath in backup_files[:files_to_delete]:
try:
os.remove(filepath)
logger.info(f"Deleted old backup: {filepath}")
except Exception as e:
logger.warning(f"Failed to delete backup {filepath}: {e}")
def backup_local_playlists(local_path: str) -> str | None:
"""Create a backup of local playlists.
Reads all playlist files from the local path and writes them to a zip file
without any modifications.
Args:
local_path: Path to the local playlist directory
Returns:
Path to the created backup file, or None if backup failed
"""
ensure_backup_dir()
if not local_path or not os.path.isdir(local_path):
logger.warning(f"Local path does not exist or is not a directory: {local_path}")
return None
timestamp = get_timestamp()
backup_filename = f"local_backup_{timestamp}.zip"
backup_path = os.path.join(BACKUP_DIR, backup_filename)
try:
playlist_count = 0
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for entry in os.scandir(local_path):
if not entry.is_file():
continue
if not entry.name.lower().endswith((".m3u", ".m3u8")):
continue
# Read the original file content
try:
with open(entry.path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# Try with different encoding
with open(entry.path, 'r', encoding='latin-1') as f:
content = f.read()
# Get the playlist name without extension and add .m3u8 extension
playlist_name = os.path.splitext(entry.name)[0]
archive_name = f"{playlist_name}.m3u8"
# Write to zip
zipf.writestr(archive_name, content)
playlist_count += 1
if playlist_count == 0:
# Remove empty zip file
os.remove(backup_path)
logger.info("No playlists found for local backup")
return None
logger.info(f"Created local backup with {playlist_count} playlists: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Failed to create local backup: {e}")
# Clean up partial backup file if it exists
if os.path.exists(backup_path):
try:
os.remove(backup_path)
except OSError:
pass
return None
def backup_cloud_playlists(library_name: str) -> str | None:
"""Create a backup of cloud playlists.
Fetches all playlists from the Plex server and writes them to a zip file
without any modifications.
Args:
library_name: Name of the Plex library to backup playlists from
Returns:
Path to the created backup file, or None if backup failed
"""
ensure_backup_dir()
if not plex_client.connected:
logger.warning("Plex client not connected, cannot backup cloud playlists")
return None
if not library_name:
logger.warning("No library name specified for cloud backup")
return None
timestamp = get_timestamp()
backup_filename = f"cloud_backup_{timestamp}.zip"
backup_path = os.path.join(BACKUP_DIR, backup_filename)
try:
playlists = plex_client.get_lib_playlists(library_name)
if not playlists:
logger.info("No playlists found for cloud backup")
return None
playlist_count = 0
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for playlist in playlists:
try:
# Get playlist items
items = playlist.items()
# Build m3u8 content
lines = ["#EXTM3U"]
for item in items:
# Try to get file path from the track
try:
if hasattr(item, 'media') and item.media:
for media in item.media:
if hasattr(media, 'parts') and media.parts:
for part in media.parts:
if hasattr(part, 'file') and part.file:
# Add extended info if available
duration = getattr(item, 'duration', 0) or 0
duration_seconds = duration // 1000 if duration else -1
title = getattr(item, 'title', 'Unknown')
artist = ''
if hasattr(item, 'grandparentTitle'):
artist = item.grandparentTitle
elif hasattr(item, 'artist'):
artist_attr = getattr(item, 'artist')
if callable(artist_attr):
artist = str(artist_attr())
else:
artist = str(artist_attr)
extinf = f"#EXTINF:{duration_seconds},{artist} - {title}"
lines.append(extinf)
lines.append(part.file)
break
except Exception as e:
logger.debug(f"Could not get file path for track: {e}")
continue
if len(lines) > 1: # More than just #EXTM3U
content = "\n".join(lines)
archive_name = f"{playlist.title}.m3u8"
zipf.writestr(archive_name, content)
playlist_count += 1
except Exception as e:
logger.warning(f"Failed to backup playlist '{playlist.title}': {e}")
continue
if playlist_count == 0:
# Remove empty zip file
os.remove(backup_path)
logger.info("No playlists with valid tracks found for cloud backup")
return None
logger.info(f"Created cloud backup with {playlist_count} playlists: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Failed to create cloud backup: {e}")
# Clean up partial backup file if it exists
if os.path.exists(backup_path):
try:
os.remove(backup_path)
except OSError:
pass
return None
def perform_backup_before_sync(local_path: str, library_name: str):
"""Perform backup of both local and cloud playlists before sync.
This function should be called before sync if backup is enabled.
It also handles cleanup of old backups based on retention settings.
Args:
local_path: Path to the local playlist directory
library_name: Name of the Plex library
"""
server_config.load()
if not server_config.backup_enabled:
logger.debug("Backup is disabled, skipping pre-sync backup")
return
logger.info("Starting pre-sync backup...")
# Backup local playlists
local_backup = backup_local_playlists(local_path)
if local_backup:
cleanup_old_backups("local_backup_", server_config.backup_retention_count)
# Backup cloud playlists
cloud_backup = backup_cloud_playlists(library_name)
if cloud_backup:
cleanup_old_backups("cloud_backup_", server_config.backup_retention_count)
logger.info("Pre-sync backup completed")
+15
View File
@@ -39,6 +39,8 @@ class ServerConfig:
self.schedule_weekly_days = [0]
self.schedule_weekly_time = "03:00"
self.schedule_auto_watch = False
self.backup_enabled = False
self.backup_retention_count = 5
self.load()
def load(self) -> None:
@@ -89,6 +91,8 @@ class ServerConfig:
self.schedule_weekly_days = config.get("schedule_weekly_days", [0])
self.schedule_weekly_time = config.get("schedule_weekly_time", "03:00")
self.schedule_auto_watch = config.get("schedule_auto_watch", False)
self.backup_enabled = config.get("backup_enabled", False)
self.backup_retention_count = config.get("backup_retention_count", 5)
logger.info(f"Server config loaded.")
logger.debug(f"Current server config: {self.__dict__}")
@@ -111,6 +115,8 @@ class ServerConfig:
"schedule_weekly_days": self.schedule_weekly_days,
"schedule_weekly_time": self.schedule_weekly_time,
"schedule_auto_watch": self.schedule_auto_watch,
"backup_enabled": self.backup_enabled,
"backup_retention_count": self.backup_retention_count,
}
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
@@ -182,6 +188,15 @@ class ServerConfig:
self.schedule_auto_watch = auto_watch
self.save()
def set_backup(
self,
enabled: bool,
retention_count: int,
) -> None:
self.backup_enabled = enabled
self.backup_retention_count = retention_count
self.save()
def set_and_save_config(
self,
theme: str = None,
+5
View File
@@ -5,6 +5,7 @@ from datetime import datetime
from app.utils.logger import logger
from app.utils.playlist_merge import sync_all_playlists, SyncMode
from app.utils.config import server_config
from app.utils.backup import perform_backup_before_sync
class SyncManager:
def __init__(self):
@@ -110,6 +111,10 @@ class SyncManager:
if sync_kwargs:
kwargs.update(sync_kwargs)
# Perform backup before sync if enabled
local_dir = kwargs.get("local_dir", server_config.local_path)
perform_backup_before_sync(local_dir, server_config.library_name)
# Execute sync
return sync_all_playlists(**kwargs)