284 lines
8.2 KiB
TypeScript
284 lines
8.2 KiB
TypeScript
import { Queue, Worker, QueueEvents } from "bullmq";
|
|
import { logger } from "../server.js";
|
|
import { S3Sync, createS3SyncFromEnv, analyzeJobsDirectory } from "./s3Sync.js";
|
|
|
|
const SCHEDULER_QUEUE_NAME = "scheduledTasksQueue";
|
|
|
|
const connectionOpts = {
|
|
host: "localhost",
|
|
port: 6379,
|
|
enableReadyCheck: true,
|
|
reconnectOnError: (err: Error) => err.message.includes("READONLY")
|
|
};
|
|
|
|
export interface ScheduledTaskInfo {
|
|
name: string;
|
|
nextRun: Date | null;
|
|
status: string;
|
|
}
|
|
|
|
export class DailyS3Scheduler {
|
|
private s3Sync: S3Sync | null = null;
|
|
private schedulerQueue: Queue | null = null;
|
|
private schedulerWorker: Worker | null = null;
|
|
private queueEvents: QueueEvents | null = null;
|
|
private isInitialized: boolean = false;
|
|
|
|
constructor() {
|
|
this.s3Sync = createS3SyncFromEnv();
|
|
}
|
|
|
|
/**
|
|
* Start the daily S3 sync scheduler using BullMQ
|
|
* This ensures only one worker executes the job across all cluster instances
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.isInitialized) {
|
|
logger.warn("Scheduler already initialized");
|
|
return;
|
|
}
|
|
|
|
try {
|
|
// Initialize the queue
|
|
this.schedulerQueue = new Queue(SCHEDULER_QUEUE_NAME, {
|
|
connection: connectionOpts,
|
|
defaultJobOptions: {
|
|
removeOnComplete: 10, // Keep last 10 completed jobs
|
|
removeOnFail: 50, // Keep last 50 failed jobs
|
|
attempts: 3,
|
|
backoff: { type: "exponential", delay: 2000 }
|
|
}
|
|
});
|
|
|
|
// Initialize queue events for monitoring
|
|
this.queueEvents = new QueueEvents(SCHEDULER_QUEUE_NAME, {
|
|
connection: connectionOpts
|
|
});
|
|
|
|
// Create worker to process scheduled jobs
|
|
this.schedulerWorker = new Worker(
|
|
SCHEDULER_QUEUE_NAME,
|
|
async (job) => {
|
|
logger.info(`[Scheduler] Processing job: ${job.name} (ID: ${job.id})`);
|
|
|
|
try {
|
|
if (job.name === "daily-s3-sync") {
|
|
//await this.performDailySync();
|
|
logger.info("[Scheduler] Daily sync job completed");
|
|
} else if (job.name === "daily-analytics") {
|
|
// Add random delay of 0-90 minutes to prevent API flooding
|
|
const randomDelayMinutes = Math.floor(Math.random() * 91); // 0-90 minutes
|
|
const randomDelayMs = randomDelayMinutes * 60 * 1000;
|
|
|
|
logger.info(
|
|
`[Scheduler] Analytics job will execute in ${randomDelayMinutes} minutes (${new Date(Date.now() + randomDelayMs).toISOString()})`
|
|
);
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, randomDelayMs));
|
|
await this.triggerJobAnalysis();
|
|
}
|
|
} catch (error) {
|
|
logger.error(`[Scheduler] Job ${job.name} failed:`, error);
|
|
throw error; // Re-throw to trigger retry
|
|
}
|
|
},
|
|
{
|
|
connection: connectionOpts,
|
|
concurrency: 1, // Process one job at a time
|
|
autorun: true
|
|
}
|
|
);
|
|
|
|
// Set up worker event handlers
|
|
this.schedulerWorker.on("completed", (job) => {
|
|
logger.info(`[Scheduler] Job ${job.name} completed successfully (ID: ${job.id})`);
|
|
});
|
|
|
|
this.schedulerWorker.on("failed", (job, err) => {
|
|
logger.error(`[Scheduler] Job ${job?.name} failed (ID: ${job?.id}):`, err);
|
|
});
|
|
|
|
this.schedulerWorker.on("error", (err) => {
|
|
logger.error("[Scheduler] Worker error:", err);
|
|
});
|
|
|
|
// Add repeatable jobs (cron-like scheduling)
|
|
// Only one instance across all workers will execute these at the scheduled time
|
|
|
|
// Daily S3 sync at 1 AM PST
|
|
await this.schedulerQueue.add(
|
|
"daily-s3-sync",
|
|
{},
|
|
{
|
|
repeat: {
|
|
pattern: "0 1 * * *", // Every day at 1 AM
|
|
tz: "America/Los_Angeles" // PST/PDT timezone
|
|
},
|
|
jobId: "daily-s3-sync" // Use consistent ID to prevent duplicates
|
|
}
|
|
);
|
|
|
|
// Analytics job - every minute (adjust as needed)
|
|
await this.schedulerQueue.add(
|
|
"daily-analytics",
|
|
{},
|
|
{
|
|
repeat: {
|
|
pattern: "0 21 * * *", //9PM
|
|
tz: "America/Los_Angeles"
|
|
},
|
|
jobId: "daily-analytics"
|
|
}
|
|
);
|
|
|
|
this.isInitialized = true;
|
|
logger.info(
|
|
`[Scheduler] Daily scheduler started using BullMQ (Worker ${process.env.NODE_APP_INSTANCE || "N/A"})`
|
|
);
|
|
|
|
// Log next scheduled runs
|
|
const status = await this.getStatus();
|
|
status.nextRun.forEach((task) => {
|
|
logger.info(`[Scheduler] ${task.name} - Next run: ${task.nextRun}`);
|
|
});
|
|
} catch (error) {
|
|
logger.error("[Scheduler] Failed to start scheduler:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop the scheduler and clean up resources
|
|
*/
|
|
async stop(): Promise<void> {
|
|
try {
|
|
if (this.schedulerWorker) {
|
|
await this.schedulerWorker.close();
|
|
this.schedulerWorker = null;
|
|
logger.info("[Scheduler] Worker stopped");
|
|
}
|
|
|
|
if (this.queueEvents) {
|
|
await this.queueEvents.close();
|
|
this.queueEvents = null;
|
|
}
|
|
|
|
if (this.schedulerQueue) {
|
|
// Remove all repeatable jobs
|
|
const repeatableJobs = await this.schedulerQueue.getRepeatableJobs();
|
|
for (const job of repeatableJobs) {
|
|
await this.schedulerQueue.removeRepeatableByKey(job.key);
|
|
}
|
|
await this.schedulerQueue.close();
|
|
this.schedulerQueue = null;
|
|
logger.info("[Scheduler] Queue stopped and repeatable jobs removed");
|
|
}
|
|
|
|
this.isInitialized = false;
|
|
} catch (error) {
|
|
logger.error("[Scheduler] Error stopping scheduler:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Perform the daily sync operation
|
|
*/
|
|
private async performDailySync(): Promise<void> {
|
|
if (!this.s3Sync) {
|
|
logger.error("S3 sync not available for daily sync");
|
|
return;
|
|
}
|
|
|
|
const startTime = new Date();
|
|
logger.info(`Starting daily S3 sync at ${startTime.toISOString()}`);
|
|
|
|
try {
|
|
await this.s3Sync.syncJobsToS3();
|
|
const endTime = new Date();
|
|
const duration = endTime.getTime() - startTime.getTime();
|
|
logger.info(`Daily S3 sync completed successfully in ${duration}ms`);
|
|
} catch (error) {
|
|
logger.error("Daily S3 sync failed:", error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Manually trigger a sync (useful for testing)
|
|
*/
|
|
async triggerManualSync(): Promise<void> {
|
|
if (!this.s3Sync) {
|
|
logger.error("S3 sync not configured");
|
|
return;
|
|
}
|
|
|
|
logger.info("Triggering manual S3 sync...");
|
|
await this.performDailySync();
|
|
}
|
|
|
|
async triggerJobAnalysis(): Promise<void> {
|
|
logger.info("Triggering jobs directory analysis...");
|
|
try {
|
|
const analysis = await analyzeJobsDirectory();
|
|
logger.info("Jobs directory analysis completed");
|
|
} catch (error) {
|
|
logger.error("Jobs directory analysis failed:", error);
|
|
|
|
//TODO: implement a logger here to send an email or something.
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the next scheduled run time for all repeatable jobs
|
|
*/
|
|
private async getNextRunTime(): Promise<ScheduledTaskInfo[]> {
|
|
if (!this.schedulerQueue) {
|
|
return [];
|
|
}
|
|
|
|
try {
|
|
const repeatableJobs = await this.schedulerQueue.getRepeatableJobs();
|
|
|
|
return repeatableJobs.map((job) => ({
|
|
name: job.name || job.id || "unknown",
|
|
nextRun: job.next ? new Date(job.next) : null,
|
|
status: "scheduled"
|
|
}));
|
|
} catch (error) {
|
|
logger.error("[Scheduler] Failed to get next run times:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get scheduler status
|
|
*/
|
|
async getStatus(): Promise<{
|
|
isConfigured: boolean;
|
|
isRunning: boolean;
|
|
nextRun: ScheduledTaskInfo[];
|
|
syncStats?: { bucketName: string; region: string; keyPrefix: string; available: boolean };
|
|
}> {
|
|
let syncStats;
|
|
if (this.s3Sync) {
|
|
try {
|
|
syncStats = await this.s3Sync.getSyncStats();
|
|
} catch (error) {
|
|
logger.error("Failed to get sync stats:", error);
|
|
}
|
|
}
|
|
|
|
const nextRun = await this.getNextRunTime();
|
|
|
|
return {
|
|
isConfigured: this.s3Sync !== null,
|
|
isRunning: this.isInitialized && this.schedulerWorker !== null,
|
|
nextRun,
|
|
syncStats
|
|
};
|
|
}
|
|
}
|
|
|
|
// Export a singleton instance
|
|
export const dailyS3Scheduler = new DailyS3Scheduler();
|