import { Queue, Worker, QueueEvents } from "bullmq"; import { logger } from "../server.js"; import { S3Sync, createS3SyncFromEnv, analyzeJobsDirectory } from "./s3Sync.js"; const SCHEDULER_QUEUE_NAME = "scheduledTasksQueue"; const connectionOpts = { host: "localhost", port: 6379, enableReadyCheck: true, reconnectOnError: (err: Error) => err.message.includes("READONLY") }; export interface ScheduledTaskInfo { name: string; nextRun: Date | null; status: string; } export class DailyS3Scheduler { private s3Sync: S3Sync | null = null; private schedulerQueue: Queue | null = null; private schedulerWorker: Worker | null = null; private queueEvents: QueueEvents | null = null; private isInitialized: boolean = false; constructor() { this.s3Sync = createS3SyncFromEnv(); } /** * Start the daily S3 sync scheduler using BullMQ * This ensures only one worker executes the job across all cluster instances */ async start(): Promise { if (this.isInitialized) { logger.warn("Scheduler already initialized"); return; } try { // Initialize the queue this.schedulerQueue = new Queue(SCHEDULER_QUEUE_NAME, { connection: connectionOpts, defaultJobOptions: { removeOnComplete: 10, // Keep last 10 completed jobs removeOnFail: 50, // Keep last 50 failed jobs attempts: 3, backoff: { type: "exponential", delay: 2000 } } }); // Initialize queue events for monitoring this.queueEvents = new QueueEvents(SCHEDULER_QUEUE_NAME, { connection: connectionOpts }); // Create worker to process scheduled jobs this.schedulerWorker = new Worker( SCHEDULER_QUEUE_NAME, async (job) => { logger.info(`[Scheduler] Processing job: ${job.name} (ID: ${job.id})`); try { if (job.name === "daily-s3-sync") { //await this.performDailySync(); logger.info("[Scheduler] Daily sync job completed"); } else if (job.name === "daily-analytics") { // Add random delay of 0-90 minutes to prevent API flooding const randomDelayMinutes = Math.floor(Math.random() * 91); // 0-90 minutes const randomDelayMs = randomDelayMinutes * 60 * 1000; logger.info( `[Scheduler] Analytics job will execute in ${randomDelayMinutes} minutes (${new Date(Date.now() + randomDelayMs).toISOString()})` ); await new Promise((resolve) => setTimeout(resolve, randomDelayMs)); await this.triggerJobAnalysis(); } } catch (error) { logger.error(`[Scheduler] Job ${job.name} failed:`, error); throw error; // Re-throw to trigger retry } }, { connection: connectionOpts, concurrency: 1, // Process one job at a time autorun: true } ); // Set up worker event handlers this.schedulerWorker.on("completed", (job) => { logger.info(`[Scheduler] Job ${job.name} completed successfully (ID: ${job.id})`); }); this.schedulerWorker.on("failed", (job, err) => { logger.error(`[Scheduler] Job ${job?.name} failed (ID: ${job?.id}):`, err); }); this.schedulerWorker.on("error", (err) => { logger.error("[Scheduler] Worker error:", err); }); // Add repeatable jobs (cron-like scheduling) // Only one instance across all workers will execute these at the scheduled time // Daily S3 sync at 1 AM PST await this.schedulerQueue.add( "daily-s3-sync", {}, { repeat: { pattern: "0 1 * * *", // Every day at 1 AM tz: "America/Los_Angeles" // PST/PDT timezone }, jobId: "daily-s3-sync" // Use consistent ID to prevent duplicates } ); // Analytics job - every minute (adjust as needed) await this.schedulerQueue.add( "daily-analytics", {}, { repeat: { pattern: "0 21 * * *", //9PM tz: "America/Los_Angeles" }, jobId: "daily-analytics" } ); this.isInitialized = true; logger.info( `[Scheduler] Daily scheduler started using BullMQ (Worker ${process.env.NODE_APP_INSTANCE || "N/A"})` ); // Log next scheduled runs const status = await this.getStatus(); status.nextRun.forEach((task) => { logger.info(`[Scheduler] ${task.name} - Next run: ${task.nextRun}`); }); } catch (error) { logger.error("[Scheduler] Failed to start scheduler:", error); throw error; } } /** * Stop the scheduler and clean up resources */ async stop(): Promise { try { if (this.schedulerWorker) { await this.schedulerWorker.close(); this.schedulerWorker = null; logger.info("[Scheduler] Worker stopped"); } if (this.queueEvents) { await this.queueEvents.close(); this.queueEvents = null; } if (this.schedulerQueue) { // Remove all repeatable jobs const repeatableJobs = await this.schedulerQueue.getRepeatableJobs(); for (const job of repeatableJobs) { await this.schedulerQueue.removeRepeatableByKey(job.key); } await this.schedulerQueue.close(); this.schedulerQueue = null; logger.info("[Scheduler] Queue stopped and repeatable jobs removed"); } this.isInitialized = false; } catch (error) { logger.error("[Scheduler] Error stopping scheduler:", error); throw error; } } /** * Perform the daily sync operation */ private async performDailySync(): Promise { if (!this.s3Sync) { logger.error("S3 sync not available for daily sync"); return; } const startTime = new Date(); logger.info(`Starting daily S3 sync at ${startTime.toISOString()}`); try { await this.s3Sync.syncJobsToS3(); const endTime = new Date(); const duration = endTime.getTime() - startTime.getTime(); logger.info(`Daily S3 sync completed successfully in ${duration}ms`); } catch (error) { logger.error("Daily S3 sync failed:", error); } } /** * Manually trigger a sync (useful for testing) */ async triggerManualSync(): Promise { if (!this.s3Sync) { logger.error("S3 sync not configured"); return; } logger.info("Triggering manual S3 sync..."); await this.performDailySync(); } async triggerJobAnalysis(): Promise { logger.info("Triggering jobs directory analysis..."); try { const analysis = await analyzeJobsDirectory(); logger.info("Jobs directory analysis completed"); } catch (error) { logger.error("Jobs directory analysis failed:", error); //TODO: implement a logger here to send an email or something. } } /** * Get the next scheduled run time for all repeatable jobs */ private async getNextRunTime(): Promise { if (!this.schedulerQueue) { return []; } try { const repeatableJobs = await this.schedulerQueue.getRepeatableJobs(); return repeatableJobs.map((job) => ({ name: job.name || job.id || "unknown", nextRun: job.next ? new Date(job.next) : null, status: "scheduled" })); } catch (error) { logger.error("[Scheduler] Failed to get next run times:", error); return []; } } /** * Get scheduler status */ async getStatus(): Promise<{ isConfigured: boolean; isRunning: boolean; nextRun: ScheduledTaskInfo[]; syncStats?: { bucketName: string; region: string; keyPrefix: string; available: boolean }; }> { let syncStats; if (this.s3Sync) { try { syncStats = await this.s3Sync.getSyncStats(); } catch (error) { logger.error("Failed to get sync stats:", error); } } const nextRun = await this.getNextRunTime(); return { isConfigured: this.s3Sync !== null, isRunning: this.isInitialized && this.schedulerWorker !== null, nextRun, syncStats }; } } // Export a singleton instance export const dailyS3Scheduler = new DailyS3Scheduler();