b483edae74
Co-authored-by: Koha9 <36852125+Koha9@users.noreply.github.com>
271 lines
10 KiB
Python
271 lines
10 KiB
Python
import os
|
|
import zipfile
|
|
from datetime import datetime
|
|
from typing import List
|
|
from app.utils.logger import logger
|
|
from app.utils.config import server_config
|
|
from app.utils.local_playlist import load_local_playlist
|
|
from app.utils.plex_client import plex_client
|
|
|
|
# Default backup directory
|
|
BACKUP_DIR = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..", "backups")
|
|
)
|
|
|
|
|
|
def ensure_backup_dir():
|
|
"""Ensure the backup directory exists."""
|
|
if not os.path.exists(BACKUP_DIR):
|
|
os.makedirs(BACKUP_DIR, exist_ok=True)
|
|
logger.info(f"Created backup directory: {BACKUP_DIR}")
|
|
|
|
|
|
def get_timestamp() -> str:
|
|
"""Generate a timestamp string for backup filenames."""
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
|
|
def get_sorted_backup_files(prefix: str) -> List[str]:
|
|
"""Get backup files sorted by modification time (oldest first).
|
|
|
|
Args:
|
|
prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_')
|
|
|
|
Returns:
|
|
List of backup file paths sorted by modification time (oldest first)
|
|
"""
|
|
ensure_backup_dir()
|
|
backup_files = []
|
|
for filename in os.listdir(BACKUP_DIR):
|
|
if filename.startswith(prefix) and filename.endswith('.zip'):
|
|
filepath = os.path.join(BACKUP_DIR, filename)
|
|
backup_files.append(filepath)
|
|
|
|
# Sort by modification time (oldest first)
|
|
backup_files.sort(key=lambda x: os.path.getmtime(x))
|
|
return backup_files
|
|
|
|
|
|
def cleanup_old_backups(prefix: str, retention_count: int):
|
|
"""Delete old backup files exceeding the retention count.
|
|
|
|
Args:
|
|
prefix: The prefix to filter backup files (e.g., 'local_backup_' or 'cloud_backup_')
|
|
retention_count: Maximum number of backups to keep (0 means no auto-delete)
|
|
"""
|
|
if retention_count <= 0:
|
|
logger.debug(f"Backup retention count is {retention_count}, skipping cleanup for {prefix}")
|
|
return
|
|
|
|
backup_files = get_sorted_backup_files(prefix)
|
|
|
|
# Delete oldest files if we exceed retention count
|
|
files_to_delete = len(backup_files) - retention_count
|
|
if files_to_delete > 0:
|
|
for filepath in backup_files[:files_to_delete]:
|
|
try:
|
|
os.remove(filepath)
|
|
logger.info(f"Deleted old backup: {filepath}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete backup {filepath}: {e}")
|
|
|
|
|
|
def backup_local_playlists(local_path: str) -> str | None:
|
|
"""Create a backup of local playlists.
|
|
|
|
Reads all playlist files from the local path and writes them to a zip file
|
|
without any modifications.
|
|
|
|
Args:
|
|
local_path: Path to the local playlist directory
|
|
|
|
Returns:
|
|
Path to the created backup file, or None if backup failed
|
|
"""
|
|
ensure_backup_dir()
|
|
|
|
if not local_path or not os.path.isdir(local_path):
|
|
logger.warning(f"Local path does not exist or is not a directory: {local_path}")
|
|
return None
|
|
|
|
timestamp = get_timestamp()
|
|
backup_filename = f"local_backup_{timestamp}.zip"
|
|
backup_path = os.path.join(BACKUP_DIR, backup_filename)
|
|
|
|
try:
|
|
playlist_count = 0
|
|
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
for entry in os.scandir(local_path):
|
|
if not entry.is_file():
|
|
continue
|
|
if not entry.name.lower().endswith((".m3u", ".m3u8")):
|
|
continue
|
|
|
|
# Read the original file content
|
|
try:
|
|
with open(entry.path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
except UnicodeDecodeError:
|
|
# Try with different encoding
|
|
with open(entry.path, 'r', encoding='latin-1') as f:
|
|
content = f.read()
|
|
|
|
# Get the playlist name without extension and add .m3u8 extension
|
|
playlist_name = os.path.splitext(entry.name)[0]
|
|
archive_name = f"{playlist_name}.m3u8"
|
|
|
|
# Write to zip
|
|
zipf.writestr(archive_name, content)
|
|
playlist_count += 1
|
|
|
|
if playlist_count == 0:
|
|
# Remove empty zip file
|
|
os.remove(backup_path)
|
|
logger.info("No playlists found for local backup")
|
|
return None
|
|
|
|
logger.info(f"Created local backup with {playlist_count} playlists: {backup_path}")
|
|
return backup_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create local backup: {e}")
|
|
# Clean up partial backup file if it exists
|
|
if os.path.exists(backup_path):
|
|
try:
|
|
os.remove(backup_path)
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def backup_cloud_playlists(library_name: str) -> str | None:
|
|
"""Create a backup of cloud playlists.
|
|
|
|
Fetches all playlists from the Plex server and writes them to a zip file
|
|
without any modifications.
|
|
|
|
Args:
|
|
library_name: Name of the Plex library to backup playlists from
|
|
|
|
Returns:
|
|
Path to the created backup file, or None if backup failed
|
|
"""
|
|
ensure_backup_dir()
|
|
|
|
if not plex_client.connected:
|
|
logger.warning("Plex client not connected, cannot backup cloud playlists")
|
|
return None
|
|
|
|
if not library_name:
|
|
logger.warning("No library name specified for cloud backup")
|
|
return None
|
|
|
|
timestamp = get_timestamp()
|
|
backup_filename = f"cloud_backup_{timestamp}.zip"
|
|
backup_path = os.path.join(BACKUP_DIR, backup_filename)
|
|
|
|
try:
|
|
playlists = plex_client.get_lib_playlists(library_name)
|
|
if not playlists:
|
|
logger.info("No playlists found for cloud backup")
|
|
return None
|
|
|
|
playlist_count = 0
|
|
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
for playlist in playlists:
|
|
try:
|
|
# Get playlist items
|
|
items = playlist.items()
|
|
|
|
# Build m3u8 content
|
|
lines = ["#EXTM3U"]
|
|
for item in items:
|
|
# Try to get file path from the track
|
|
try:
|
|
if hasattr(item, 'media') and item.media:
|
|
for media in item.media:
|
|
if hasattr(media, 'parts') and media.parts:
|
|
for part in media.parts:
|
|
if hasattr(part, 'file') and part.file:
|
|
# Add extended info if available
|
|
duration = getattr(item, 'duration', 0) or 0
|
|
duration_seconds = duration // 1000 if duration else -1
|
|
title = getattr(item, 'title', 'Unknown')
|
|
artist = ''
|
|
if hasattr(item, 'grandparentTitle'):
|
|
artist = item.grandparentTitle
|
|
elif hasattr(item, 'artist'):
|
|
artist_attr = getattr(item, 'artist')
|
|
if callable(artist_attr):
|
|
artist = str(artist_attr())
|
|
else:
|
|
artist = str(artist_attr)
|
|
|
|
extinf = f"#EXTINF:{duration_seconds},{artist} - {title}"
|
|
lines.append(extinf)
|
|
lines.append(part.file)
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Could not get file path for track: {e}")
|
|
continue
|
|
|
|
if len(lines) > 1: # More than just #EXTM3U
|
|
content = "\n".join(lines)
|
|
archive_name = f"{playlist.title}.m3u8"
|
|
zipf.writestr(archive_name, content)
|
|
playlist_count += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to backup playlist '{playlist.title}': {e}")
|
|
continue
|
|
|
|
if playlist_count == 0:
|
|
# Remove empty zip file
|
|
os.remove(backup_path)
|
|
logger.info("No playlists with valid tracks found for cloud backup")
|
|
return None
|
|
|
|
logger.info(f"Created cloud backup with {playlist_count} playlists: {backup_path}")
|
|
return backup_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create cloud backup: {e}")
|
|
# Clean up partial backup file if it exists
|
|
if os.path.exists(backup_path):
|
|
try:
|
|
os.remove(backup_path)
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def perform_backup_before_sync(local_path: str, library_name: str):
|
|
"""Perform backup of both local and cloud playlists before sync.
|
|
|
|
This function should be called before sync if backup is enabled.
|
|
It also handles cleanup of old backups based on retention settings.
|
|
|
|
Args:
|
|
local_path: Path to the local playlist directory
|
|
library_name: Name of the Plex library
|
|
"""
|
|
server_config.load()
|
|
|
|
if not server_config.backup_enabled:
|
|
logger.debug("Backup is disabled, skipping pre-sync backup")
|
|
return
|
|
|
|
logger.info("Starting pre-sync backup...")
|
|
|
|
# Backup local playlists
|
|
local_backup = backup_local_playlists(local_path)
|
|
if local_backup:
|
|
cleanup_old_backups("local_backup_", server_config.backup_retention_count)
|
|
|
|
# Backup cloud playlists
|
|
cloud_backup = backup_cloud_playlists(library_name)
|
|
if cloud_backup:
|
|
cleanup_old_backups("cloud_backup_", server_config.backup_retention_count)
|
|
|
|
logger.info("Pre-sync backup completed")
|