Files
PlexPlaylistSync/app/utils/plex_client.py
T

405 lines
16 KiB
Python

import os
import pickle
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from plexapi.library import MusicSection
from urllib.parse import urlparse
from app.utils.common import str_is_empty
from app.utils.logger import logger
MUSIC_LIBRARY_TYPE = "artist"
UNMATCHED_TRACK_RATING_KEY = "unmatched_track"
CACHE_DIR_ID_LENGTH = 6
CACHE_TRACKS_BASE_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "cache")
)
def build_plex_url(scheme, url, port="32400"):
"""Build a full Plex URL from scheme, url, and port."""
# 如果url不以http://或https://开头,则添加scheme
full_url = url
if not full_url.startswith("http://") and not full_url.startswith("https://"):
full_url = f"{scheme}://{url}"
parsed = urlparse(full_url)
if parsed.scheme in ("http", "https"):
netloc = parsed.netloc or parsed.path
if ":" not in netloc and port:
netloc = f"{netloc}:{port}"
base_url = f"{parsed.scheme}://{netloc}"
else:
base_url = f"http://{url}:{port}"
return base_url
class PlexClient:
"""A client for interacting with a Plex server."""
def __init__(self) -> None:
self.server: PlexServer | None = None
self.token: str | None = None
self.base_url: str | None = None
self.connected = False
def connect(
self,
username: str = "",
password: str = "",
token: str = "",
scheme: str = "https",
url: str = "",
port: str = "32400",
timeout: int | None = None,
) -> tuple[PlexServer, str]:
"""Connect to the Plex server using username/password or token.
Parameters:
username (str): Plex username.
password (str): Plex password.
token (str): Plex authentication token.
scheme (str): URL scheme (http or https).
url (str): Plex server URL.
port (str): Plex server port, default is 32400.
Returns:
PlexServer: A connected PlexServer instance.
str: The authentication token used for the connection.
"""
# Connect Server with token if token is provided, otherwise use username/password
try:
if not str_is_empty(token):
self.server, self.token = self._connect_with_token(
token, scheme, url, port, timeout
)
else:
self.server, self.token = self._connect_with_pw(
username, password, scheme, url, port, timeout
)
# Update the base URL and connection status
self.base_url = build_plex_url(scheme, url, port)
self.connected = True
logger.info(
f"Connected to Plex server at {self.base_url} with token: {self.token}"
)
return self.server, self.token
except Exception as e:
logger.warning(f"Failed to connect to Plex server: {str(e)}")
self.connected = False
raise
def _connect_with_pw(
self,
username: str,
password: str,
scheme: str,
url: str,
port: str = "32400",
timeout: int | None = None,
):
"""Return a connected PlexServer instance and update config with token and server info."""
# url 初始化
self.base_url = build_plex_url(scheme, url, port)
# account 初始化
account = MyPlexAccount(username, password, timeout=timeout)
# token 获取
self.token = account.authenticationToken
self.server = PlexServer(self.base_url, self.token, timeout=timeout)
logger.debug(
f"Connected to Plex server with username: {username}, token: {self.token}"
)
return self.server, self.token
def _connect_with_token(
self,
token: str,
scheme: str,
url: str,
port: str = "32400",
timeout: int | None = None,
):
"""Return a connected PlexServer instance using a token."""
# URL 初始化
self.base_url = build_plex_url(scheme, url, port)
self.server = PlexServer(self.base_url, token, timeout=timeout)
logger.debug(f"Connected to Plex server with token: {token}")
return self.server, token
def _connect_check(self):
"""Check if the Plex server is connected."""
if not self.connected or not self.server:
logger.error("Plex client is not connected.")
raise RuntimeError("Plex client is not connected.")
def get_server(self) -> PlexServer | None:
"""Return the connected Plex server instance.
returns:
PlexServer: A connected PlexServer instance.
"""
self._connect_check()
return self.server
def get_libs_name_list(self) -> list[str]:
"""Return a list of library names from the Plex server.
returns:
list[str]: A list of all library names.
"""
self._connect_check()
try:
libraries = self.server.library.sections()
music_libs = [lib for lib in libraries if getattr(lib, "type", None) == MUSIC_LIBRARY_TYPE]
lib_names = [lib.title for lib in music_libs]
logger.info(
f"Fetched {len(lib_names)} music library names from the server (type='{MUSIC_LIBRARY_TYPE}')."
)
return lib_names
except Exception as e:
logger.warning(f"Failed to fetch library names: {str(e)}")
raise RuntimeError(f"Failed to fetch library names: {str(e)}")
def get_lib(self, library_name: str) -> MusicSection | None:
"""Return a specific library from the Plex server.
Args:
library_name (str): Name of the library to fetch.
returns:
MusicSection: A MusicSection representing the specified library.
"""
self._connect_check()
try:
lib = self.server.library.section(library_name)
if not lib:
logger.error(f"Library '{library_name}' not found.")
raise ValueError(f"Library '{library_name}' not found.")
elif lib.type != MUSIC_LIBRARY_TYPE:
logger.error(
f"Library '{library_name}' is not a music library. Only music libraries are supported."
)
raise ValueError(
f"Library '{library_name}' is not a music library. Only music libraries are supported."
)
logger.debug(f"Fetched library '{library_name}' from the server.")
return lib
except Exception as e:
logger.warning(f"Failed to fetch library '{library_name}': {str(e)}")
raise RuntimeError(f"Failed to fetch library '{library_name}': {str(e)}")
def get_lib_playlists(self, library_name: str) -> list | None:
"""Return all playlists from the Plex server.
Args:
library_name (str): Name of the library to fetch playlists from.
returns:
list: A list of all playlists in the specified library.
"""
self._connect_check()
lib = self.get_lib(library_name)
# Fetch playlists from the library
try:
playlists = lib.playlists()
logger.info(f"Fetched {len(playlists)} playlists from the server.")
return playlists
except Exception as e:
logger.warning(f"Failed to fetch playlists: {str(e)}")
raise RuntimeError(f"Failed to fetch playlists: {str(e)}")
def get_lib_tracks(self, library_name: str) -> list:
"""Cache all tracks from the Plex server.
Args:
library_name (str): Name of the library to fetch tracks from.
returns:
list: A list of all tracks in the specified library.
"""
self._connect_check()
lib = self.get_lib(library_name)
try:
tracks = lib.all(libtype="track")
logger.info(f"Fetched {len(tracks)} tracks from library '{library_name}'.")
return tracks
except Exception as e:
logger.warning(
f"Failed to fetch tracks from library '{library_name}': {str(e)}"
)
raise RuntimeError(
f"Failed to fetch tracks from library '{library_name}': {str(e)}"
)
def cache_lib_tracks(self, library_name: str) -> dict:
"""Cache all tracks from the Plex server.
All tracks are cached as a mapping of file paths to rating keys.
like: {file_path: ratingKey, ...}
Args:
library_name (str): Name of the library to cache tracks from.
Returns:
dict: A mapping of file paths to rating keys.
"""
try:
all_tracks = self.get_lib_tracks(library_name)
dir_2_ratingKey = {}
for track in all_tracks:
dir_2_ratingKey[track.media[0].parts[0].file] = track.ratingKey
# only use top CACHE_DIR_ID_LENGTH characters of server.machineIdentifier and library key
server_id = self.server.machineIdentifier[:CACHE_DIR_ID_LENGTH]
lib_id = self.get_lib(library_name).uuid[:CACHE_DIR_ID_LENGTH]
# cache directory path
cache_dir = os.path.join(CACHE_TRACKS_BASE_DIR, server_id)
cache_name = f"{lib_id}.pkl"
# Ensure the cache directory exists
os.makedirs(cache_dir, exist_ok=True)
with open(os.path.join(cache_dir, cache_name), "wb") as f:
pickle.dump(dir_2_ratingKey, f, protocol=pickle.HIGHEST_PROTOCOL)
logger.info(
f"Cached {len(dir_2_ratingKey)} tracks from library '{library_name}' to {cache_dir}."
)
return dir_2_ratingKey
except Exception as e:
logger.warning(f"Failed to cache tracks from library '{library_name}': {str(e)}")
raise RuntimeError(f"Failed to cache tracks from library '{library_name}': {str(e)}")
def load_cached_tracks(self, library_name: str) -> dict:
"""Load cached tracks from the local cache file.
Cached tracks will be loaded as a mapping of file paths to rating keys.
like: {file_path: ratingKey, ...}
Args:
library_name (str): Name of the library to load cached tracks from.
Returns:
dict: A mapping of file paths to rating keys.
"""
self._connect_check()
server_id = self.server.machineIdentifier[:CACHE_DIR_ID_LENGTH]
lib_id = self.get_lib(library_name).uuid[:CACHE_DIR_ID_LENGTH]
# check if cache directory exists
cache_dir = os.path.join(CACHE_TRACKS_BASE_DIR, server_id)
if not os.path.exists(cache_dir):
logger.warning(f"Cache directory {cache_dir} does not exist.")
raise FileNotFoundError(f"Cache directory {cache_dir} does not exist. Server may not have cached tracks.")
# check if cache file exists
cache_name = f"{lib_id}.pkl"
cache_path = os.path.join(cache_dir, cache_name)
if not os.path.exists(cache_path):
logger.warning(f"Cache file {cache_path} does not exist.")
raise FileNotFoundError(f"Cache file {cache_path} does not exist. Library {library_name} may not have cached tracks.")
# load cache file
with open(cache_path, "rb") as f:
return pickle.load(f)
def match_tracks(self, library_name: str, local_tracks: list) -> dict:
"""Match local tracks with Plex server tracks ratingKey.
Will return a mapping of local file paths to Plex rating keys.
like: {local_file_path: plex_ratingKey, ...}
unmatched tracks will be assigned a special rating key.
like: {local_file_path: 'unmatched_track'}
Args:
library_name (str): Name of the library to match tracks from.
tracks (list): List of local tracks to match.
Returns:
bidict: A bidirectional mapping of local file paths to Plex rating keys.
"""
self._connect_check()
cached_tracks = self.load_cached_tracks(library_name)
local_2_plex = {}
matched_count = 0
for track in local_tracks:
if track in cached_tracks:
local_2_plex[track] = cached_tracks[track]
matched_count += 1
else:
local_2_plex[track] = UNMATCHED_TRACK_RATING_KEY
logger.info(
f"Matched {matched_count}/{len(local_tracks)} local tracks with Plex server tracks in library '{library_name}'."
)
return local_2_plex
def get_playlist(self, title: str):
"""Get a playlist by title."""
self._connect_check()
try:
# Exact match search for playlist
playlists = self.server.playlists(title=title)
if playlists:
return playlists[0]
return None
except Exception as e:
logger.error(f"Error fetching playlist {title}: {e}")
return None
def create_playlist(self, title: str, items: list):
"""Create a new playlist with the given items."""
self._connect_check()
try:
self.server.createPlaylist(title, items=items)
logger.info(f"Created playlist {title} with {len(items)} items.")
return True
except Exception as e:
logger.error(f"Error creating playlist {title}: {e}")
return False
def delete_playlist(self, title: str):
"""Delete a playlist by title."""
self._connect_check()
try:
playlist = self.get_playlist(title)
if playlist:
playlist.delete()
logger.info(f"Deleted playlist {title}.")
return True
else:
logger.warning(f"Playlist {title} not found for deletion.")
return False
except Exception as e:
logger.error(f"Error deleting playlist {title}: {e}")
return False
def update_playlist(self, title: str, items: list):
"""
Update a playlist with a new list of items.
This implementation replaces the existing items with the new ones.
"""
self._connect_check()
try:
playlist = self.get_playlist(title)
if not playlist:
return self.create_playlist(title, items)
# Remove all items and add new ones
playlist.removeItems(playlist.items())
if items:
playlist.addItems(items)
logger.info(f"Updated playlist {title} with {len(items)} items.")
return True
except Exception as e:
logger.error(f"Error updating playlist {title}: {e}")
return False
def get_items_by_paths(self, library_name: str, paths: list[str]) -> list:
"""
Find Plex items (tracks) by their file paths.
"""
self._connect_check()
if not paths:
return []
try:
path_map = self.match_tracks(library_name, paths)
except FileNotFoundError:
logger.info(f"Cache not found for {library_name}, creating it...")
self.cache_lib_tracks(library_name)
path_map = self.match_tracks(library_name, paths)
items = []
for path in paths:
rating_key = path_map.get(path)
if rating_key and rating_key != UNMATCHED_TRACK_RATING_KEY:
try:
item = self.server.fetchItem(rating_key)
items.append(item)
except Exception as e:
logger.warning(f"Failed to fetch item for ratingKey {rating_key}: {e}")
else:
logger.warning(f"Track not found in Plex library (or unmatched): {path}")
return items
plex_client = PlexClient()