Merge branch 'copilot/adjust-ui-and-sync-strategy'
This commit is contained in:
@@ -0,0 +1,270 @@
|
||||
import os
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
from app.utils.logger import logger
|
||||
from app.utils.config import server_config
|
||||
from app.utils.local_playlist import load_local_playlist
|
||||
from app.utils.plex_client import plex_client
|
||||
|
||||
# Default backup directory
|
||||
BACKUP_DIR = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), "..", "..", "backups")
|
||||
)
|
||||
|
||||
|
||||
def ensure_backup_dir():
|
||||
"""Ensure the backup directory exists."""
|
||||
if not os.path.exists(BACKUP_DIR):
|
||||
os.makedirs(BACKUP_DIR, exist_ok=True)
|
||||
logger.info(f"Created backup directory: {BACKUP_DIR}")
|
||||
|
||||
|
||||
def get_timestamp() -> str:
|
||||
"""Generate a timestamp string for backup filenames."""
|
||||
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
|
||||
def get_sorted_backup_files(prefix: str) -> List[str]:
|
||||
"""Get backup files sorted by modification time (oldest first).
|
||||
|
||||
Args:
|
||||
prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_')
|
||||
|
||||
Returns:
|
||||
List of backup file paths sorted by modification time (oldest first)
|
||||
"""
|
||||
ensure_backup_dir()
|
||||
backup_files = []
|
||||
for filename in os.listdir(BACKUP_DIR):
|
||||
if filename.startswith(prefix) and filename.endswith('.zip'):
|
||||
filepath = os.path.join(BACKUP_DIR, filename)
|
||||
backup_files.append(filepath)
|
||||
|
||||
# Sort by modification time (oldest first)
|
||||
backup_files.sort(key=lambda x: os.path.getmtime(x))
|
||||
return backup_files
|
||||
|
||||
|
||||
def cleanup_old_backups(prefix: str, retention_count: int):
|
||||
"""Delete old backup files exceeding the retention count.
|
||||
|
||||
Args:
|
||||
prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_')
|
||||
retention_count: Maximum number of backups to keep (0 means no auto-delete)
|
||||
"""
|
||||
if retention_count <= 0:
|
||||
logger.debug(f"Backup retention count is {retention_count}, skipping cleanup for {prefix}")
|
||||
return
|
||||
|
||||
backup_files = get_sorted_backup_files(prefix)
|
||||
|
||||
# Delete oldest files if we exceed retention count
|
||||
files_to_delete = len(backup_files) - retention_count
|
||||
if files_to_delete > 0:
|
||||
for filepath in backup_files[:files_to_delete]:
|
||||
try:
|
||||
os.remove(filepath)
|
||||
logger.info(f"Deleted old backup: {filepath}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete backup {filepath}: {e}")
|
||||
|
||||
|
||||
def backup_local_playlists(local_path: str) -> str | None:
|
||||
"""Create a backup of local playlists.
|
||||
|
||||
Reads all playlist files from the local path and writes them to a zip file
|
||||
without any modifications.
|
||||
|
||||
Args:
|
||||
local_path: Path to the local playlist directory
|
||||
|
||||
Returns:
|
||||
Path to the created backup file, or None if backup failed
|
||||
"""
|
||||
ensure_backup_dir()
|
||||
|
||||
if not local_path or not os.path.isdir(local_path):
|
||||
logger.warning(f"Local path does not exist or is not a directory: {local_path}")
|
||||
return None
|
||||
|
||||
timestamp = get_timestamp()
|
||||
backup_filename = f"local_backup_{timestamp}.zip"
|
||||
backup_path = os.path.join(BACKUP_DIR, backup_filename)
|
||||
|
||||
try:
|
||||
playlist_count = 0
|
||||
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for entry in os.scandir(local_path):
|
||||
if not entry.is_file():
|
||||
continue
|
||||
if not entry.name.lower().endswith((".m3u", ".m3u8")):
|
||||
continue
|
||||
|
||||
# Read the original file content
|
||||
try:
|
||||
with open(entry.path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
except UnicodeDecodeError:
|
||||
# Try with different encoding
|
||||
with open(entry.path, 'r', encoding='latin-1') as f:
|
||||
content = f.read()
|
||||
|
||||
# Get the playlist name without extension and add .m3u8 extension
|
||||
playlist_name = os.path.splitext(entry.name)[0]
|
||||
archive_name = f"{playlist_name}.m3u8"
|
||||
|
||||
# Write to zip
|
||||
zipf.writestr(archive_name, content)
|
||||
playlist_count += 1
|
||||
|
||||
if playlist_count == 0:
|
||||
# Remove empty zip file
|
||||
os.remove(backup_path)
|
||||
logger.info("No playlists found for local backup")
|
||||
return None
|
||||
|
||||
logger.info(f"Created local backup with {playlist_count} playlists: {backup_path}")
|
||||
return backup_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create local backup: {e}")
|
||||
# Clean up partial backup file if it exists
|
||||
if os.path.exists(backup_path):
|
||||
try:
|
||||
os.remove(backup_path)
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def backup_cloud_playlists(library_name: str) -> str | None:
|
||||
"""Create a backup of cloud playlists.
|
||||
|
||||
Fetches all playlists from the Plex server and writes them to a zip file
|
||||
without any modifications.
|
||||
|
||||
Args:
|
||||
library_name: Name of the Plex library to backup playlists from
|
||||
|
||||
Returns:
|
||||
Path to the created backup file, or None if backup failed
|
||||
"""
|
||||
ensure_backup_dir()
|
||||
|
||||
if not plex_client.connected:
|
||||
logger.warning("Plex client not connected, cannot backup cloud playlists")
|
||||
return None
|
||||
|
||||
if not library_name:
|
||||
logger.warning("No library name specified for cloud backup")
|
||||
return None
|
||||
|
||||
timestamp = get_timestamp()
|
||||
backup_filename = f"cloud_backup_{timestamp}.zip"
|
||||
backup_path = os.path.join(BACKUP_DIR, backup_filename)
|
||||
|
||||
try:
|
||||
playlists = plex_client.get_lib_playlists(library_name)
|
||||
if not playlists:
|
||||
logger.info("No playlists found for cloud backup")
|
||||
return None
|
||||
|
||||
playlist_count = 0
|
||||
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for playlist in playlists:
|
||||
try:
|
||||
# Get playlist items
|
||||
items = playlist.items()
|
||||
|
||||
# Build m3u8 content
|
||||
lines = ["#EXTM3U"]
|
||||
for item in items:
|
||||
# Try to get file path from the track
|
||||
try:
|
||||
if hasattr(item, 'media') and item.media:
|
||||
for media in item.media:
|
||||
if hasattr(media, 'parts') and media.parts:
|
||||
for part in media.parts:
|
||||
if hasattr(part, 'file') and part.file:
|
||||
# Add extended info if available
|
||||
duration = getattr(item, 'duration', 0) or 0
|
||||
duration_seconds = duration // 1000 if duration else -1
|
||||
title = getattr(item, 'title', 'Unknown')
|
||||
artist = ''
|
||||
if hasattr(item, 'grandparentTitle'):
|
||||
artist = item.grandparentTitle
|
||||
elif hasattr(item, 'artist'):
|
||||
artist_attr = getattr(item, 'artist')
|
||||
if callable(artist_attr):
|
||||
artist = str(artist_attr())
|
||||
else:
|
||||
artist = str(artist_attr)
|
||||
|
||||
extinf = f"#EXTINF:{duration_seconds},{artist} - {title}"
|
||||
lines.append(extinf)
|
||||
lines.append(part.file)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not get file path for track: {e}")
|
||||
continue
|
||||
|
||||
if len(lines) > 1: # More than just #EXTM3U
|
||||
content = "\n".join(lines)
|
||||
archive_name = f"{playlist.title}.m3u8"
|
||||
zipf.writestr(archive_name, content)
|
||||
playlist_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to backup playlist '{playlist.title}': {e}")
|
||||
continue
|
||||
|
||||
if playlist_count == 0:
|
||||
# Remove empty zip file
|
||||
os.remove(backup_path)
|
||||
logger.info("No playlists with valid tracks found for cloud backup")
|
||||
return None
|
||||
|
||||
logger.info(f"Created cloud backup with {playlist_count} playlists: {backup_path}")
|
||||
return backup_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create cloud backup: {e}")
|
||||
# Clean up partial backup file if it exists
|
||||
if os.path.exists(backup_path):
|
||||
try:
|
||||
os.remove(backup_path)
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def perform_backup_before_sync(local_path: str, library_name: str):
|
||||
"""Perform backup of both local and cloud playlists before sync.
|
||||
|
||||
This function should be called before sync if backup is enabled.
|
||||
It also handles cleanup of old backups based on retention settings.
|
||||
|
||||
Args:
|
||||
local_path: Path to the local playlist directory
|
||||
library_name: Name of the Plex library
|
||||
"""
|
||||
server_config.load()
|
||||
|
||||
if not server_config.backup_enabled:
|
||||
logger.debug("Backup is disabled, skipping pre-sync backup")
|
||||
return
|
||||
|
||||
logger.info("Starting pre-sync backup...")
|
||||
|
||||
# Backup local playlists
|
||||
local_backup = backup_local_playlists(local_path)
|
||||
if local_backup:
|
||||
cleanup_old_backups("local_backup_", server_config.backup_retention_count)
|
||||
|
||||
# Backup cloud playlists
|
||||
cloud_backup = backup_cloud_playlists(library_name)
|
||||
if cloud_backup:
|
||||
cleanup_old_backups("cloud_backup_", server_config.backup_retention_count)
|
||||
|
||||
logger.info("Pre-sync backup completed")
|
||||
@@ -39,6 +39,8 @@ class ServerConfig:
|
||||
self.schedule_weekly_days = [0]
|
||||
self.schedule_weekly_time = "03:00"
|
||||
self.schedule_auto_watch = False
|
||||
self.backup_enabled = False
|
||||
self.backup_retention_count = 5
|
||||
self.load()
|
||||
|
||||
def load(self) -> None:
|
||||
@@ -89,6 +91,8 @@ class ServerConfig:
|
||||
self.schedule_weekly_days = config.get("schedule_weekly_days", [0])
|
||||
self.schedule_weekly_time = config.get("schedule_weekly_time", "03:00")
|
||||
self.schedule_auto_watch = config.get("schedule_auto_watch", False)
|
||||
self.backup_enabled = config.get("backup_enabled", False)
|
||||
self.backup_retention_count = config.get("backup_retention_count", 5)
|
||||
logger.info(f"Server config loaded.")
|
||||
logger.debug(f"Current server config: {self.__dict__}")
|
||||
|
||||
@@ -111,6 +115,8 @@ class ServerConfig:
|
||||
"schedule_weekly_days": self.schedule_weekly_days,
|
||||
"schedule_weekly_time": self.schedule_weekly_time,
|
||||
"schedule_auto_watch": self.schedule_auto_watch,
|
||||
"backup_enabled": self.backup_enabled,
|
||||
"backup_retention_count": self.backup_retention_count,
|
||||
}
|
||||
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
|
||||
json.dump(config, f, indent=4, ensure_ascii=False)
|
||||
@@ -182,6 +188,15 @@ class ServerConfig:
|
||||
self.schedule_auto_watch = auto_watch
|
||||
self.save()
|
||||
|
||||
def set_backup(
|
||||
self,
|
||||
enabled: bool,
|
||||
retention_count: int,
|
||||
) -> None:
|
||||
self.backup_enabled = enabled
|
||||
self.backup_retention_count = retention_count
|
||||
self.save()
|
||||
|
||||
def set_and_save_config(
|
||||
self,
|
||||
theme: str = None,
|
||||
|
||||
@@ -6,6 +6,7 @@ from datetime import datetime
|
||||
from app.utils.logger import logger
|
||||
from app.utils.playlist_merge import sync_all_playlists, SyncMode
|
||||
from app.utils.config import server_config
|
||||
from app.utils.backup import perform_backup_before_sync
|
||||
from app.utils.local_playlist import load_local_playlist, write_local_playlist, delete_local_playlist
|
||||
from app.utils.plex_client import plex_client
|
||||
|
||||
@@ -113,6 +114,10 @@ class SyncManager:
|
||||
if sync_kwargs:
|
||||
kwargs.update(sync_kwargs)
|
||||
|
||||
# Perform backup before sync if enabled
|
||||
local_dir = kwargs.get("local_dir", server_config.local_path)
|
||||
perform_backup_before_sync(local_dir, server_config.library_name)
|
||||
|
||||
# Execute sync
|
||||
results = sync_all_playlists(**kwargs)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user