from typing import Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.base import BaseTrigger from app.utils.config import server_config from app.utils.logger import logger from app.utils.watcher import watcher_manager from app.utils.sync_manager import sync_manager import os # Initialize the scheduler scheduler = BackgroundScheduler() def validate_cron_expression(expression: str) -> bool: """ Validates a cron expression. Expected format: "minute hour day month day_of_week" """ try: parts = expression.split() if len(parts) != 5: return False # Try to create a trigger to validate CronTrigger( minute=parts[0], hour=parts[1], day=parts[2], month=parts[3], day_of_week=parts[4] ) return True except Exception: return False def job_function(): """ The function to be executed by the scheduler. Triggers the sync process. """ logger.info("Executing scheduled sync job...") try: sync_manager.run_sync(trigger_source="scheduler", wait=False) except Exception as e: logger.error(f"Error during scheduled sync job: {e}", exc_info=True) def start_scheduler(): """ Starts the background scheduler if it's not already running. """ if not scheduler.running: scheduler.start() logger.info("Scheduler started.") update_scheduler_job() def _create_cron_trigger(cron_exp: str) -> Optional[CronTrigger]: """Helper to create a CronTrigger from a cron expression string.""" try: # 5 parts: minute hour day month day_of_week parts = cron_exp.split() if len(parts) == 5: return CronTrigger( minute=parts[0], hour=parts[1], day=parts[2], month=parts[3], day_of_week=parts[4] ) else: logger.error(f"Invalid cron expression format (needs 5 parts): {cron_exp}") except Exception as e: logger.error(f"Invalid cron expression: {cron_exp}, error: {e}") return None def _create_daily_trigger(time_str: str) -> Optional[CronTrigger]: """Helper to create a CronTrigger for daily execution at a specific time.""" try: hour, minute = map(int, time_str.split(':')) return CronTrigger(hour=hour, minute=minute) except ValueError: logger.error(f"Invalid daily time format: {time_str}") return None def _create_weekly_trigger(days: list[int], time_str: str) -> Optional[CronTrigger]: """ Helper to create a CronTrigger for weekly execution. days: List of integers 0-6 where 0 is Sunday, 1 is Monday, ..., 6 is Saturday. APScheduler expects: 0 = Monday, ..., 6 = Sunday. """ # Convert Frontend days (0=Sun...6=Sat) to APScheduler days (0=Mon...6=Sun) aps_days = [] for d in days: if d == 0: aps_days.append(6) # Sunday else: aps_days.append(d - 1) # Mon(1)->0, ..., Sat(6)->5 days_str = ",".join(map(str, aps_days)) try: hour, minute = map(int, time_str.split(':')) return CronTrigger(day_of_week=days_str, hour=hour, minute=minute) except ValueError: logger.error(f"Invalid weekly time format: {time_str}") return None def update_scheduler_job(): """ Updates the scheduler jobs based on the current configuration. Reloads configuration, handles auto-watch, and sets up the sync job trigger. """ scheduler.remove_all_jobs() # Reload config to get latest schedule settings server_config.load() logger.info("Configuration reloaded for scheduler update.") # Handle Auto Watch if server_config.schedule_auto_watch: # Ensure we have an absolute path local_path = os.path.abspath(server_config.local_path) watcher_manager.start(local_path) logger.info(f"Auto-watch started for path: {local_path}") else: watcher_manager.stop() logger.info("Auto-watch stopped.") mode = server_config.schedule_mode logger.info(f"Updating scheduler with mode: {mode}") if mode == "DISABLED": logger.info("Schedule is disabled. No jobs added.") return trigger: Optional[BaseTrigger] = None if mode == "CRON": trigger = _create_cron_trigger(server_config.schedule_cron) elif mode == "DAILY": trigger = _create_daily_trigger(server_config.schedule_daily_time) elif mode == "WEEKLY": trigger = _create_weekly_trigger(server_config.schedule_weekly_days, server_config.schedule_weekly_time) if trigger: scheduler.add_job(job_function, trigger) logger.info(f"Added scheduled job with mode '{mode}' and trigger: {trigger}") else: logger.warning(f"Failed to create trigger for mode '{mode}'. No job added.") def get_next_run_time(): """ Returns the next run time of the scheduled job, if any. """ jobs = scheduler.get_jobs() if not jobs: return None # Assuming only one job is scheduled for sync job = jobs[0] return job.next_run_time