93cc72d612
cache plex tracks.
300 lines
12 KiB
Python
300 lines
12 KiB
Python
import os
|
|
import pickle
|
|
from plexapi.myplex import MyPlexAccount
|
|
from plexapi.server import PlexServer
|
|
from plexapi.library import MusicSection
|
|
from urllib.parse import urlparse
|
|
from app.utils.common import str_is_empty
|
|
from app.utils.logger import logger
|
|
|
|
MUSIC_LIBRARY_TYPE = "artist"
|
|
UNMATCHED_TRACK_RATING_KEY = "unmatched_track"
|
|
CACHE_DIR_ID_LENGTH = 6
|
|
CACHE_TRACKS_BASE_DIR = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "cache")
|
|
)
|
|
|
|
|
|
def build_plex_url(scheme, url, port="32400"):
|
|
"""Build a full Plex URL from scheme, url, and port."""
|
|
# 如果url不以http://或https://开头,则添加scheme
|
|
full_url = url
|
|
if not full_url.startswith("http://") and not full_url.startswith("https://"):
|
|
full_url = f"{scheme}://{url}"
|
|
parsed = urlparse(full_url)
|
|
|
|
if parsed.scheme in ("http", "https"):
|
|
netloc = parsed.netloc or parsed.path
|
|
if ":" not in netloc and port:
|
|
netloc = f"{netloc}:{port}"
|
|
base_url = f"{parsed.scheme}://{netloc}"
|
|
else:
|
|
base_url = f"http://{url}:{port}"
|
|
return base_url
|
|
|
|
|
|
class PlexClient:
|
|
"""A client for interacting with a Plex server."""
|
|
|
|
def __init__(self) -> None:
|
|
self.server: PlexServer | None = None
|
|
self.token: str | None = None
|
|
self.base_url: str | None = None
|
|
self.connected = False
|
|
|
|
def connect(
|
|
self,
|
|
username: str = "",
|
|
password: str = "",
|
|
token: str = "",
|
|
scheme: str = "https",
|
|
url: str = "",
|
|
port: str = "32400",
|
|
) -> tuple[PlexServer, str]:
|
|
"""Connect to the Plex server using username/password or token.
|
|
|
|
Parameters:
|
|
username (str): Plex username.
|
|
password (str): Plex password.
|
|
token (str): Plex authentication token.
|
|
scheme (str): URL scheme (http or https).
|
|
url (str): Plex server URL.
|
|
port (str): Plex server port, default is 32400.
|
|
|
|
Returns:
|
|
PlexServer: A connected PlexServer instance.
|
|
str: The authentication token used for the connection.
|
|
"""
|
|
# Connect Server with token if token is provided, otherwise use username/password
|
|
try:
|
|
if not str_is_empty(token):
|
|
self.server, self.token = self._connect_with_token(
|
|
token, scheme, url, port
|
|
)
|
|
else:
|
|
self.server, self.token = self._connect_with_pw(
|
|
username, password, scheme, url, port
|
|
)
|
|
# Update the base URL and connection status
|
|
self.base_url = build_plex_url(scheme, url, port)
|
|
self.connected = True
|
|
logger.info(
|
|
f"Connected to Plex server at {self.base_url} with token: {self.token}"
|
|
)
|
|
return self.server, self.token
|
|
except Exception as e:
|
|
logger.warning(f"Failed to connect to Plex server: {str(e)}")
|
|
self.connected = False
|
|
raise
|
|
|
|
def _connect_with_pw(
|
|
self, username: str, password: str, scheme: str, url: str, port: str = "32400"
|
|
):
|
|
"""Return a connected PlexServer instance and update config with token and server info."""
|
|
# url 初始化
|
|
self.base_url = build_plex_url(scheme, url, port)
|
|
# account 初始化
|
|
account = MyPlexAccount(username, password)
|
|
# token 获取
|
|
self.token = account.authenticationToken
|
|
|
|
self.server = PlexServer(self.base_url, self.token)
|
|
logger.debug(
|
|
f"Connected to Plex server with username: {username}, token: {self.token}"
|
|
)
|
|
return self.server, self.token
|
|
|
|
def _connect_with_token(
|
|
self, token: str, scheme: str, url: str, port: str = "32400"
|
|
):
|
|
"""Return a connected PlexServer instance using a token."""
|
|
# URL 初始化
|
|
self.base_url = build_plex_url(scheme, url, port)
|
|
|
|
self.server = PlexServer(self.base_url, token)
|
|
logger.debug(f"Connected to Plex server with token: {token}")
|
|
return self.server, token
|
|
|
|
def _connect_check(self):
|
|
"""Check if the Plex server is connected."""
|
|
if not self.connected or not self.server:
|
|
logger.error("Plex client is not connected.")
|
|
raise RuntimeError("Plex client is not connected.")
|
|
|
|
def get_server(self) -> PlexServer | None:
|
|
"""Return the connected Plex server instance.
|
|
returns:
|
|
PlexServer: A connected PlexServer instance.
|
|
"""
|
|
self._connect_check()
|
|
return self.server
|
|
|
|
def get_libs_name_list(self) -> list[str]:
|
|
"""Return a list of library names from the Plex server.
|
|
returns:
|
|
list[str]: A list of all library names.
|
|
"""
|
|
self._connect_check()
|
|
try:
|
|
libraries = self.server.library.sections()
|
|
lib_names = [lib.title for lib in libraries]
|
|
logger.info(f"Fetched {len(lib_names)} library names from the server.")
|
|
return lib_names
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch library names: {str(e)}")
|
|
raise RuntimeError(f"Failed to fetch library names: {str(e)}")
|
|
|
|
def get_lib(self, library_name: str) -> MusicSection | None:
|
|
"""Return a specific library from the Plex server.
|
|
Args:
|
|
library_name (str): Name of the library to fetch.
|
|
returns:
|
|
MusicSection: A MusicSection representing the specified library.
|
|
"""
|
|
self._connect_check()
|
|
try:
|
|
lib = self.server.library.section(library_name)
|
|
if not lib:
|
|
logger.error(f"Library '{library_name}' not found.")
|
|
raise ValueError(f"Library '{library_name}' not found.")
|
|
elif lib.type != MUSIC_LIBRARY_TYPE:
|
|
logger.error(
|
|
f"Library '{library_name}' is not a music library. Only music libraries are supported."
|
|
)
|
|
raise ValueError(
|
|
f"Library '{library_name}' is not a music library. Only music libraries are supported."
|
|
)
|
|
logger.debug(f"Fetched library '{library_name}' from the server.")
|
|
return lib
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch library '{library_name}': {str(e)}")
|
|
raise RuntimeError(f"Failed to fetch library '{library_name}': {str(e)}")
|
|
|
|
def get_lib_playlists(self, library_name: str) -> list | None:
|
|
"""Return all playlists from the Plex server.
|
|
Args:
|
|
library_name (str): Name of the library to fetch playlists from.
|
|
returns:
|
|
list: A list of all playlists in the specified library.
|
|
"""
|
|
self._connect_check()
|
|
lib = self.get_lib(library_name)
|
|
# Fetch playlists from the library
|
|
try:
|
|
playlists = lib.playlists()
|
|
logger.info(f"Fetched {len(playlists)} playlists from the server.")
|
|
return playlists
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch playlists: {str(e)}")
|
|
raise RuntimeError(f"Failed to fetch playlists: {str(e)}")
|
|
|
|
def get_lib_tracks(self, library_name: str) -> list:
|
|
"""Cache all tracks from the Plex server.
|
|
Args:
|
|
library_name (str): Name of the library to fetch tracks from.
|
|
returns:
|
|
list: A list of all tracks in the specified library.
|
|
"""
|
|
self._connect_check()
|
|
lib = self.get_lib(library_name)
|
|
try:
|
|
tracks = lib.all(libtype="track")
|
|
logger.info(f"Fetched {len(tracks)} tracks from library '{library_name}'.")
|
|
return tracks
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to fetch tracks from library '{library_name}': {str(e)}"
|
|
)
|
|
raise RuntimeError(
|
|
f"Failed to fetch tracks from library '{library_name}': {str(e)}"
|
|
)
|
|
|
|
def cache_lib_tracks(self, library_name: str) -> dict:
|
|
"""Cache all tracks from the Plex server.
|
|
All tracks are cached as a mapping of file paths to rating keys.
|
|
like: {file_path: ratingKey, ...}
|
|
Args:
|
|
library_name (str): Name of the library to cache tracks from.
|
|
Returns:
|
|
dict: A mapping of file paths to rating keys.
|
|
"""
|
|
try:
|
|
all_tracks = self.get_lib_tracks(library_name)
|
|
dir_2_ratingKey = {}
|
|
for track in all_tracks:
|
|
dir_2_ratingKey[track.media[0].parts[0].file] = track.ratingKey
|
|
# only use top CACHE_DIR_ID_LENGTH characters of server.machineIdentifier and library key
|
|
server_id = self.server.machineIdentifier[:CACHE_DIR_ID_LENGTH]
|
|
lib_id = self.get_lib(library_name).uuid[:CACHE_DIR_ID_LENGTH]
|
|
# cache directory path
|
|
cache_dir = os.path.join(CACHE_TRACKS_BASE_DIR, server_id)
|
|
cache_name = f"{lib_id}.pkl"
|
|
# Ensure the cache directory exists
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
with open(os.path.join(cache_dir, cache_name), "wb") as f:
|
|
pickle.dump(dir_2_ratingKey, f, protocol=pickle.HIGHEST_PROTOCOL)
|
|
logger.info(
|
|
f"Cached {len(dir_2_ratingKey)} tracks from library '{library_name}' to {cache_dir}."
|
|
)
|
|
return dir_2_ratingKey
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cache tracks from library '{library_name}': {str(e)}")
|
|
raise RuntimeError(f"Failed to cache tracks from library '{library_name}': {str(e)}")
|
|
|
|
def load_cached_tracks(self, library_name: str) -> dict:
|
|
"""Load cached tracks from the local cache file.
|
|
Cached tracks will be loaded as a mapping of file paths to rating keys.
|
|
like: {file_path: ratingKey, ...}
|
|
Args:
|
|
library_name (str): Name of the library to load cached tracks from.
|
|
Returns:
|
|
dict: A mapping of file paths to rating keys.
|
|
"""
|
|
self._connect_check()
|
|
server_id = self.server.machineIdentifier[:CACHE_DIR_ID_LENGTH]
|
|
lib_id = self.get_lib(library_name).uuid[:CACHE_DIR_ID_LENGTH]
|
|
# check if cache directory exists
|
|
cache_dir = os.path.join(CACHE_TRACKS_BASE_DIR, server_id)
|
|
if not os.path.exists(cache_dir):
|
|
logger.warning(f"Cache directory {cache_dir} does not exist.")
|
|
raise FileNotFoundError(f"Cache directory {cache_dir} does not exist. Server may not have cached tracks.")
|
|
# check if cache file exists
|
|
cache_name = f"{lib_id}.pkl"
|
|
cache_path = os.path.join(cache_dir, cache_name)
|
|
if not os.path.exists(cache_path):
|
|
logger.warning(f"Cache file {cache_path} does not exist.")
|
|
raise FileNotFoundError(f"Cache file {cache_path} does not exist. Library {library_name} may not have cached tracks.")
|
|
# load cache file
|
|
with open(cache_path, "rb") as f:
|
|
return pickle.load(f)
|
|
|
|
def match_tracks(self, library_name: str, local_tracks: list) -> dict:
|
|
"""Match local tracks with Plex server tracks ratingKey.
|
|
Will return a mapping of local file paths to Plex rating keys.
|
|
like: {local_file_path: plex_ratingKey, ...}
|
|
unmatched tracks will be assigned a special rating key.
|
|
like: {local_file_path: 'unmatched_track'}
|
|
|
|
Args:
|
|
library_name (str): Name of the library to match tracks from.
|
|
tracks (list): List of local tracks to match.
|
|
Returns:
|
|
bidict: A bidirectional mapping of local file paths to Plex rating keys.
|
|
"""
|
|
self._connect_check()
|
|
cached_tracks = self.load_cached_tracks(library_name)
|
|
local_2_plex = {}
|
|
matched_count = 0
|
|
for track in local_tracks:
|
|
if track in cached_tracks:
|
|
local_2_plex[track] = cached_tracks[track]
|
|
matched_count += 1
|
|
else:
|
|
local_2_plex[track] = UNMATCHED_TRACK_RATING_KEY
|
|
logger.info(
|
|
f"Matched {matched_count}/{len(local_tracks)} local tracks with Plex server tracks in library '{library_name}'."
|
|
)
|
|
return local_2_plex
|
|
|
|
plex_client = PlexClient()
|