feat: Enhance scheduler and watcher with improved logging and cron trigger helpers
This commit is contained in:
+88
-45
@@ -1,14 +1,21 @@
|
||||
from typing import Optional
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.base import BaseTrigger
|
||||
from app.utils.config import server_config
|
||||
from app.utils.logger import logger
|
||||
from app.utils.watcher import watcher_manager
|
||||
from app.utils.sync_manager import sync_manager
|
||||
import os
|
||||
|
||||
# Initialize the scheduler
|
||||
scheduler = BackgroundScheduler()
|
||||
|
||||
def validate_cron_expression(expression: str) -> bool:
|
||||
"""
|
||||
Validates a cron expression.
|
||||
Expected format: "minute hour day month day_of_week"
|
||||
"""
|
||||
try:
|
||||
parts = expression.split()
|
||||
if len(parts) != 5:
|
||||
@@ -26,92 +33,128 @@ def validate_cron_expression(expression: str) -> bool:
|
||||
return False
|
||||
|
||||
def job_function():
|
||||
"""
|
||||
The function to be executed by the scheduler.
|
||||
Triggers the sync process.
|
||||
"""
|
||||
logger.info("Executing scheduled sync job...")
|
||||
sync_manager.run_sync(trigger_source="scheduler", wait=False)
|
||||
try:
|
||||
sync_manager.run_sync(trigger_source="scheduler", wait=False)
|
||||
except Exception as e:
|
||||
logger.error(f"Error during scheduled sync job: {e}", exc_info=True)
|
||||
|
||||
def start_scheduler():
|
||||
"""
|
||||
Starts the background scheduler if it's not already running.
|
||||
"""
|
||||
if not scheduler.running:
|
||||
scheduler.start()
|
||||
logger.info("Scheduler started.")
|
||||
update_scheduler_job()
|
||||
|
||||
def _create_cron_trigger(cron_exp: str) -> Optional[CronTrigger]:
|
||||
"""Helper to create a CronTrigger from a cron expression string."""
|
||||
try:
|
||||
# 5 parts: minute hour day month day_of_week
|
||||
parts = cron_exp.split()
|
||||
if len(parts) == 5:
|
||||
return CronTrigger(
|
||||
minute=parts[0],
|
||||
hour=parts[1],
|
||||
day=parts[2],
|
||||
month=parts[3],
|
||||
day_of_week=parts[4]
|
||||
)
|
||||
else:
|
||||
logger.error(f"Invalid cron expression format (needs 5 parts): {cron_exp}")
|
||||
except Exception as e:
|
||||
logger.error(f"Invalid cron expression: {cron_exp}, error: {e}")
|
||||
return None
|
||||
|
||||
def _create_daily_trigger(time_str: str) -> Optional[CronTrigger]:
|
||||
"""Helper to create a CronTrigger for daily execution at a specific time."""
|
||||
try:
|
||||
hour, minute = map(int, time_str.split(':'))
|
||||
return CronTrigger(hour=hour, minute=minute)
|
||||
except ValueError:
|
||||
logger.error(f"Invalid daily time format: {time_str}")
|
||||
return None
|
||||
|
||||
def _create_weekly_trigger(days: list[int], time_str: str) -> Optional[CronTrigger]:
|
||||
"""
|
||||
Helper to create a CronTrigger for weekly execution.
|
||||
days: List of integers 0-6 where 0 is Sunday, 1 is Monday, ..., 6 is Saturday.
|
||||
APScheduler expects: 0 = Monday, ..., 6 = Sunday.
|
||||
"""
|
||||
# Convert Frontend days (0=Sun...6=Sat) to APScheduler days (0=Mon...6=Sun)
|
||||
aps_days = []
|
||||
for d in days:
|
||||
if d == 0:
|
||||
aps_days.append(6) # Sunday
|
||||
else:
|
||||
aps_days.append(d - 1) # Mon(1)->0, ..., Sat(6)->5
|
||||
|
||||
days_str = ",".join(map(str, aps_days))
|
||||
|
||||
try:
|
||||
hour, minute = map(int, time_str.split(':'))
|
||||
return CronTrigger(day_of_week=days_str, hour=hour, minute=minute)
|
||||
except ValueError:
|
||||
logger.error(f"Invalid weekly time format: {time_str}")
|
||||
return None
|
||||
|
||||
def update_scheduler_job():
|
||||
"""
|
||||
Updates the scheduler jobs based on the current configuration.
|
||||
Reloads configuration, handles auto-watch, and sets up the sync job trigger.
|
||||
"""
|
||||
scheduler.remove_all_jobs()
|
||||
|
||||
# Reload config to get latest schedule settings
|
||||
server_config.load()
|
||||
logger.info("Configuration reloaded for scheduler update.")
|
||||
|
||||
# Handle Auto Watch
|
||||
if server_config.schedule_auto_watch:
|
||||
# Ensure we have an absolute path
|
||||
local_path = os.path.abspath(server_config.local_path)
|
||||
watcher_manager.start(local_path)
|
||||
logger.info(f"Auto-watch started for path: {local_path}")
|
||||
else:
|
||||
watcher_manager.stop()
|
||||
logger.info("Auto-watch stopped.")
|
||||
|
||||
mode = server_config.schedule_mode
|
||||
logger.info(f"Updating scheduler with mode: {mode}")
|
||||
|
||||
if mode == "DISABLED":
|
||||
logger.info("Schedule is disabled.")
|
||||
logger.info("Schedule is disabled. No jobs added.")
|
||||
return
|
||||
|
||||
trigger = None
|
||||
trigger: Optional[BaseTrigger] = None
|
||||
|
||||
if mode == "CRON":
|
||||
cron_exp = server_config.schedule_cron
|
||||
if cron_exp:
|
||||
try:
|
||||
# 5 parts: minute hour day month day_of_week
|
||||
parts = cron_exp.split()
|
||||
if len(parts) == 5:
|
||||
trigger = CronTrigger(
|
||||
minute=parts[0],
|
||||
hour=parts[1],
|
||||
day=parts[2],
|
||||
month=parts[3],
|
||||
day_of_week=parts[4]
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Invalid cron expression: {cron_exp}, error: {e}")
|
||||
trigger = _create_cron_trigger(server_config.schedule_cron)
|
||||
|
||||
elif mode == "DAILY":
|
||||
time_str = server_config.schedule_daily_time
|
||||
try:
|
||||
hour, minute = map(int, time_str.split(':'))
|
||||
trigger = CronTrigger(hour=hour, minute=minute)
|
||||
except ValueError:
|
||||
logger.error(f"Invalid daily time: {time_str}")
|
||||
trigger = _create_daily_trigger(server_config.schedule_daily_time)
|
||||
|
||||
elif mode == "WEEKLY":
|
||||
days = server_config.schedule_weekly_days # list of ints 0-6 (Sun-Sat)
|
||||
time_str = server_config.schedule_weekly_time
|
||||
|
||||
# Frontend: 0(Sun), 1(Mon)... 6(Sat)
|
||||
# APScheduler: 0(Mon)... 6(Sun)
|
||||
|
||||
aps_days = []
|
||||
for d in days:
|
||||
if d == 0: aps_days.append(6)
|
||||
else: aps_days.append(d - 1)
|
||||
|
||||
days_str = ",".join(map(str, aps_days))
|
||||
|
||||
try:
|
||||
hour, minute = map(int, time_str.split(':'))
|
||||
trigger = CronTrigger(day_of_week=days_str, hour=hour, minute=minute)
|
||||
except ValueError:
|
||||
logger.error(f"Invalid weekly time: {time_str}")
|
||||
trigger = _create_weekly_trigger(server_config.schedule_weekly_days, server_config.schedule_weekly_time)
|
||||
|
||||
if trigger:
|
||||
scheduler.add_job(job_function, trigger)
|
||||
logger.info(f"Added scheduled job with mode {mode} and trigger {trigger}")
|
||||
logger.info(f"Added scheduled job with mode '{mode}' and trigger: {trigger}")
|
||||
else:
|
||||
logger.warning(f"Failed to create trigger for mode {mode}")
|
||||
logger.warning(f"Failed to create trigger for mode '{mode}'. No job added.")
|
||||
|
||||
def get_next_run_time():
|
||||
"""
|
||||
Returns the next run time of the scheduled job, if any.
|
||||
"""
|
||||
jobs = scheduler.get_jobs()
|
||||
if not jobs:
|
||||
return None
|
||||
# Assuming only one job
|
||||
# Assuming only one job is scheduled for sync
|
||||
job = jobs[0]
|
||||
return job.next_run_time
|
||||
|
||||
+49
-25
@@ -1,19 +1,23 @@
|
||||
import os
|
||||
import threading
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
from watchdog.observers.polling import PollingObserver as Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.events import FileSystemEventHandler, FileSystemEvent
|
||||
from app.utils.logger import logger
|
||||
from app.utils.config import server_config
|
||||
from app.utils.sync_manager import sync_manager
|
||||
|
||||
class PlaylistEventHandler(FileSystemEventHandler):
|
||||
"""
|
||||
Handles file system events for the playlist directory.
|
||||
Triggers a sync operation when changes are detected, with debouncing.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.debounce_timer = None
|
||||
self.debounce_timer: Optional[threading.Timer] = None
|
||||
self.debounce_interval = 5.0 # Seconds
|
||||
|
||||
def on_any_event(self, event):
|
||||
# Log all events for debugging (using INFO temporarily to ensure visibility)
|
||||
logger.info(f"[WATCHER-DEBUG] Event detected: {event.event_type} {event.src_path}")
|
||||
def on_any_event(self, event: FileSystemEvent):
|
||||
# Log all events at DEBUG level to avoid cluttering INFO logs
|
||||
logger.debug(f"[Watcher] Event detected: {event.event_type} {event.src_path}")
|
||||
|
||||
if event.is_directory:
|
||||
return
|
||||
@@ -23,55 +27,70 @@ class PlaylistEventHandler(FileSystemEventHandler):
|
||||
if event.event_type not in ['created', 'modified', 'deleted', 'moved']:
|
||||
return
|
||||
|
||||
# Ignore temporary files or hidden files if necessary
|
||||
# Ignore temporary files or hidden files
|
||||
filename = os.path.basename(event.src_path)
|
||||
if filename.startswith('.'):
|
||||
return
|
||||
|
||||
# Prevent feedback loops: if sync is in progress, ignore events (likely caused by the sync itself)
|
||||
# Prevent feedback loops: if sync is in progress, ignore events
|
||||
if sync_manager.is_syncing:
|
||||
logger.info(f"[WATCHER-DEBUG] Ignoring event {event.event_type} on {event.src_path} because sync is in progress.")
|
||||
logger.debug(f"[Watcher] Ignoring event {event.event_type} on {event.src_path} because sync is in progress.")
|
||||
return
|
||||
|
||||
logger.info(f"File system event detected and accepted: {event.event_type} {event.src_path}")
|
||||
logger.info(f"[Watcher] Accepted file change: {event.event_type} {event.src_path}")
|
||||
self.trigger_sync()
|
||||
|
||||
def trigger_sync(self):
|
||||
"""
|
||||
Triggers the sync process after a debounce interval.
|
||||
"""
|
||||
if self.debounce_timer:
|
||||
self.debounce_timer.cancel()
|
||||
|
||||
# Debounce for 5 seconds to allow multiple file operations to complete
|
||||
self.debounce_timer = threading.Timer(5.0, self.run_sync)
|
||||
logger.debug(f"[Watcher] Debouncing sync for {self.debounce_interval} seconds...")
|
||||
self.debounce_timer = threading.Timer(self.debounce_interval, self.run_sync)
|
||||
self.debounce_timer.start()
|
||||
|
||||
def run_sync(self):
|
||||
logger.info("Triggering sync due to file change...")
|
||||
sync_manager.run_sync(trigger_source="watcher", wait=False)
|
||||
"""
|
||||
Executes the sync via SyncManager.
|
||||
"""
|
||||
logger.info("[Watcher] Debounce timer expired. Triggering sync due to file changes.")
|
||||
try:
|
||||
sync_manager.run_sync(trigger_source="watcher", wait=False)
|
||||
except Exception as e:
|
||||
logger.error(f"[Watcher] Failed to trigger sync: {e}", exc_info=True)
|
||||
|
||||
class WatcherManager:
|
||||
"""
|
||||
Manages the lifecycle of the file watcher.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.observer = None
|
||||
self.handler = None
|
||||
self.current_path = None
|
||||
self.observer: Optional[Observer] = None
|
||||
self.handler: Optional[PlaylistEventHandler] = None
|
||||
self.current_path: Optional[str] = None
|
||||
|
||||
def start(self, path):
|
||||
def start(self, path: str):
|
||||
"""
|
||||
Starts watching the specified directory.
|
||||
"""
|
||||
# If already watching the same path, do nothing
|
||||
if self.observer and self.observer.is_alive() and self.current_path == path:
|
||||
logger.info(f"Watcher already running on {path}")
|
||||
logger.info(f"[Watcher] Already running on {path}")
|
||||
return
|
||||
|
||||
self.stop()
|
||||
|
||||
if not os.path.exists(path):
|
||||
logger.warning(f"Cannot watch path {path}: Directory does not exist.")
|
||||
logger.warning(f"[Watcher] Cannot watch path {path}: Directory does not exist.")
|
||||
return
|
||||
|
||||
logger.info(f"Starting file watcher on: {path}")
|
||||
logger.info(f"[Watcher] Starting file watcher on: {path}")
|
||||
try:
|
||||
files = os.listdir(path)
|
||||
logger.info(f"Files currently in watch directory: {files}")
|
||||
logger.debug(f"[Watcher] Initial files in watch directory: {files}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list files in watch directory: {e}")
|
||||
logger.error(f"[Watcher] Failed to list files in watch directory: {e}")
|
||||
|
||||
self.handler = PlaylistEventHandler()
|
||||
# Explicitly set timeout for PollingObserver
|
||||
@@ -79,13 +98,18 @@ class WatcherManager:
|
||||
self.observer.schedule(self.handler, path, recursive=True)
|
||||
self.observer.start()
|
||||
self.current_path = path
|
||||
logger.info("[Watcher] Watcher started successfully.")
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stops the file watcher.
|
||||
"""
|
||||
if self.observer:
|
||||
logger.info("Stopping file watcher...")
|
||||
logger.info("[Watcher] Stopping file watcher...")
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
self.observer = None
|
||||
self.current_path = None
|
||||
logger.info("[Watcher] Watcher stopped.")
|
||||
|
||||
watcher_manager = WatcherManager()
|
||||
|
||||
Reference in New Issue
Block a user