diff --git a/server.ts b/server.ts index 7fcd360..b5d6c39 100644 --- a/server.ts +++ b/server.ts @@ -191,16 +191,16 @@ app.get("/sync/status", ValidateImsToken, async (req, res) => { }); // Manual S3 sync trigger endpoint (for testing) -app.post("/sync/trigger", ValidateImsToken, async (req, res) => { - try { - await dailyS3Scheduler.triggerManualSync(); - res.json({ success: true, message: "Manual sync triggered successfully" }); - } catch (error) { - logger.error("Manua--l sync failed:", error); - const errorMessage = error instanceof Error ? error.message : "Unknown error"; - res.status(500).json({ success: false, message: "Manual sync failed", error: errorMessage }); - } -}); +// app.post("/sync/trigger", ValidateImsToken, async (req, res) => { +// try { +// await dailyS3Scheduler.triggerManualSync(); +// res.json({ success: true, message: "Manual sync triggered successfully" }); +// } catch (error) { +// logger.error("Manua--l sync failed:", error); +// const errorMessage = error instanceof Error ? error.message : "Unknown error"; +// res.status(500).json({ success: false, message: "Manual sync failed", error: errorMessage }); +// } +// }); // Jobs directory analysis endpoint app.get("/jobs/analysis", ValidateImsToken, async (req, res) => { @@ -220,9 +220,14 @@ app.use(FolderPaths.StaticPath, express.static(FolderPaths.Root, { etag: false, app.use("/assets", express.static("/assets", { etag: false, maxAge: 30 * 1000 })); // Start the daily S3 sync scheduler -dailyS3Scheduler.start().catch((error) => { - logger.error("Failed to start sync scheduler:", error); -}); +dailyS3Scheduler + .start() + .then(() => { + logger.info("Sync scheduler started successfully."); + }) + .catch((error) => { + logger.error("Failed to start sync scheduler:", error); + }); app.listen(port, () => { logger.info(`ImEX Media Server is running at http://localhost:${port}`); diff --git a/util/dailyS3Scheduler.ts b/util/dailyS3Scheduler.ts index 206e1d0..33fbe20 100644 --- a/util/dailyS3Scheduler.ts +++ b/util/dailyS3Scheduler.ts @@ -1,60 +1,172 @@ -import * as cron from "node-cron"; +import { Queue, Worker, QueueEvents } from "bullmq"; import { logger } from "../server.js"; -import { S3Sync, createS3SyncFromEnv } from "./s3Sync.js"; +import { S3Sync, createS3SyncFromEnv, analyzeJobsDirectory } from "./s3Sync.js"; + +const SCHEDULER_QUEUE_NAME = "scheduledTasksQueue"; + +const connectionOpts = { + host: "localhost", + port: 6379, + enableReadyCheck: true, + reconnectOnError: (err: Error) => err.message.includes("READONLY") +}; + +export interface ScheduledTaskInfo { + name: string; + nextRun: Date | null; + status: string; +} export class DailyS3Scheduler { private s3Sync: S3Sync | null = null; - private cronJob: cron.ScheduledTask | null = null; + private schedulerQueue: Queue | null = null; + private schedulerWorker: Worker | null = null; + private queueEvents: QueueEvents | null = null; + private isInitialized: boolean = false; constructor() { this.s3Sync = createS3SyncFromEnv(); } /** - * Start the daily S3 sync scheduler - * Runs at midnight PST (00:00 PST = 08:00 UTC during standard time, 07:00 UTC during daylight time) + * Start the daily S3 sync scheduler using BullMQ + * This ensures only one worker executes the job across all cluster instances */ async start(): Promise { - if (!this.s3Sync) { - logger.warn("S3 sync not configured. Skipping scheduler setup."); + if (this.isInitialized) { + logger.warn("Scheduler already initialized"); return; } - // Test S3 connection before starting scheduler - // const connectionTest = await this.s3Sync.testConnection(); - // if (!connectionTest) { - // logger.error("S3 connection test failed. S3 sync scheduler will not be started."); - // return; - // } + try { + // Initialize the queue + this.schedulerQueue = new Queue(SCHEDULER_QUEUE_NAME, { + connection: connectionOpts, + defaultJobOptions: { + removeOnComplete: 10, // Keep last 10 completed jobs + removeOnFail: 50, // Keep last 50 failed jobs + attempts: 3, + backoff: { type: "exponential", delay: 2000 } + } + }); - // Cron expression for midnight PST - // Note: This uses PST timezone. During PDT (daylight time), it will still run at midnight local time - const cronExpression = "0 6 * * *"; // Every day at midnight - const timezone = "America/Los_Angeles"; // PST/PDT timezone + // Initialize queue events for monitoring + this.queueEvents = new QueueEvents(SCHEDULER_QUEUE_NAME, { + connection: connectionOpts + }); - this.cronJob = cron.schedule( - cronExpression, - async () => { - //await this.performDailySync(); - await this.triggerJobAnalysis(); - }, - { - timezone: timezone - } - ); + // Create worker to process scheduled jobs + this.schedulerWorker = new Worker( + SCHEDULER_QUEUE_NAME, + async (job) => { + logger.info(`[Scheduler] Processing job: ${job.name} (ID: ${job.id})`); + + try { + if (job.name === "daily-s3-sync") { + await this.performDailySync(); + } else if (job.name === "daily-analytics") { + await this.triggerJobAnalysis(); + } + } catch (error) { + logger.error(`[Scheduler] Job ${job.name} failed:`, error); + throw error; // Re-throw to trigger retry + } + }, + { + connection: connectionOpts, + concurrency: 1, // Process one job at a time + autorun: true + } + ); - logger.info(`Daily scheduler started. Will run at midnight PST/PDT.`); - logger.info(`Next sync scheduled for: ${this.getNextRunTime()}`); + // Set up worker event handlers + this.schedulerWorker.on("completed", (job) => { + logger.info(`[Scheduler] Job ${job.name} completed successfully (ID: ${job.id})`); + }); + + this.schedulerWorker.on("failed", (job, err) => { + logger.error(`[Scheduler] Job ${job?.name} failed (ID: ${job?.id}):`, err); + }); + + this.schedulerWorker.on("error", (err) => { + logger.error("[Scheduler] Worker error:", err); + }); + + // Add repeatable jobs (cron-like scheduling) + // Only one instance across all workers will execute these at the scheduled time + + // Daily S3 sync at 1 AM PST + await this.schedulerQueue.add( + "daily-s3-sync", + {}, + { + repeat: { + pattern: "0 1 * * *", // Every day at 1 AM + tz: "America/Los_Angeles" // PST/PDT timezone + }, + jobId: "daily-s3-sync" // Use consistent ID to prevent duplicates + } + ); + + // Analytics job - every minute (adjust as needed) + await this.schedulerQueue.add( + "daily-analytics", + {}, + { + repeat: { + pattern: "21 * * * *", //9PM + tz: "America/Los_Angeles" + }, + jobId: "daily-analytics" + } + ); + + this.isInitialized = true; + logger.info(`[Scheduler] Daily scheduler started using BullMQ (Worker ${process.env.NODE_APP_INSTANCE || "N/A"})`); + + // Log next scheduled runs + const status = await this.getStatus(); + status.nextRun.forEach(task => { + logger.info(`[Scheduler] ${task.name} - Next run: ${task.nextRun}`); + }); + + } catch (error) { + logger.error("[Scheduler] Failed to start scheduler:", error); + throw error; + } } /** - * Stop the scheduler + * Stop the scheduler and clean up resources */ - stop(): void { - if (this.cronJob) { - this.cronJob.stop(); - this.cronJob = null; - logger.info("Daily S3 sync scheduler stopped."); + async stop(): Promise { + try { + if (this.schedulerWorker) { + await this.schedulerWorker.close(); + this.schedulerWorker = null; + logger.info("[Scheduler] Worker stopped"); + } + + if (this.queueEvents) { + await this.queueEvents.close(); + this.queueEvents = null; + } + + if (this.schedulerQueue) { + // Remove all repeatable jobs + const repeatableJobs = await this.schedulerQueue.getRepeatableJobs(); + for (const job of repeatableJobs) { + await this.schedulerQueue.removeRepeatableByKey(job.key); + } + await this.schedulerQueue.close(); + this.schedulerQueue = null; + logger.info("[Scheduler] Queue stopped and repeatable jobs removed"); + } + + this.isInitialized = false; + } catch (error) { + logger.error("[Scheduler] Error stopping scheduler:", error); + throw error; } } @@ -94,49 +206,37 @@ export class DailyS3Scheduler { } async triggerJobAnalysis(): Promise { - if (!this.s3Sync) { - logger.error("S3 sync not configured"); - return; - } - logger.info("Triggering jobs directory analysis..."); try { - const analysis = await this.s3Sync.analyzeJobsDirectory(); + const analysis = await analyzeJobsDirectory(); logger.info("Jobs directory analysis completed:", analysis); } catch (error) { logger.error("Jobs directory analysis failed:", error); + + //TODO: implement a logger here to send an email or something. } } /** - * Get the next scheduled run time + * Get the next scheduled run time for all repeatable jobs */ - private getNextRunTime(): string { - if (!this.cronJob) { - return "Not scheduled"; + private async getNextRunTime(): Promise { + if (!this.schedulerQueue) { + return []; } - // Create a date object for midnight PST today - const now = new Date(); - const pstNow = new Date(now.toLocaleString("en-US", { timeZone: "America/Los_Angeles" })); - - // If it's past midnight today, next run is tomorrow at midnight - const nextRun = new Date(pstNow); - if (pstNow.getHours() > 0 || pstNow.getMinutes() > 0 || pstNow.getSeconds() > 0) { - nextRun.setDate(nextRun.getDate() + 1); + try { + const repeatableJobs = await this.schedulerQueue.getRepeatableJobs(); + + return repeatableJobs.map(job => ({ + name: job.name || job.id || "unknown", + nextRun: job.next ? new Date(job.next) : null, + status: "scheduled" + })); + } catch (error) { + logger.error("[Scheduler] Failed to get next run times:", error); + return []; } - nextRun.setHours(0, 0, 0, 0); - - return nextRun.toLocaleString("en-US", { - timeZone: "America/Los_Angeles", - weekday: "long", - year: "numeric", - month: "long", - day: "numeric", - hour: "2-digit", - minute: "2-digit", - timeZoneName: "short" - }); } /** @@ -145,7 +245,7 @@ export class DailyS3Scheduler { async getStatus(): Promise<{ isConfigured: boolean; isRunning: boolean; - nextRun: string; + nextRun: ScheduledTaskInfo[]; syncStats?: { bucketName: string; region: string; keyPrefix: string; available: boolean }; }> { let syncStats; @@ -157,10 +257,12 @@ export class DailyS3Scheduler { } } + const nextRun = await this.getNextRunTime(); + return { isConfigured: this.s3Sync !== null, - isRunning: this.cronJob !== null, - nextRun: this.getNextRunTime(), + isRunning: this.isInitialized && this.schedulerWorker !== null, + nextRun, syncStats }; } diff --git a/util/s3Sync.ts b/util/s3Sync.ts index d41e6a6..881b0f3 100644 --- a/util/s3Sync.ts +++ b/util/s3Sync.ts @@ -60,7 +60,7 @@ export class S3Sync { // Check if Jobs directory exists if (!(await fs.pathExists(jobsPath))) { - logger.warn(`Jobs directory does not exist: ${jobsPath}`); + logger.error(`Jobs directory does not exist: ${jobsPath}`); return; } @@ -145,7 +145,7 @@ export class S3Sync { * Get sync statistics using AWS CLI */ async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> { - const available = await this.testConnection(); + const available = false; //await this.testConnection(); return { bucketName: this.config.bucketName, region: this.config.region, @@ -153,157 +153,6 @@ export class S3Sync { available }; } - - /** - * Analyze all job folders in the Jobs directory - * Returns detailed statistics for each job folder - */ - async analyzeJobsDirectory(): Promise { - try { - logger.info("Starting Jobs directory analysis..."); - - const jobsPath = FolderPaths.Jobs; - - // Check if Jobs directory exists - if (!(await fs.pathExists(jobsPath))) { - logger.warn(`Jobs directory does not exist: ${jobsPath}`); - return { - bodyshopid, - total_jobs: 0, - total_documents: 0, - total_size_bytes: 0, - total_size_mb: 0, - file_type_stats: {}, - media_analytics_details: { data: [] } - }; - } - - const jobFolders = await readdir(jobsPath); - const jobStats: JobFolderStats[] = []; - let total_documents = 0; - let total_size_bytes = 0; - const aggregated_file_type_stats: { [extension: string]: number } = {}; - - const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; - for (const jobFolder of jobFolders) { - if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) { - logger.warn(`Skipping invalid jobid directory: ${jobFolder}`); - continue; - } - const jobFolderPath = path.join(jobsPath, jobFolder); - const stat = await fsStat(jobFolderPath); - // Only process directories - if (stat.isDirectory()) { - const folderStats = await this.analyzeJobFolder(jobsPath, jobFolder); - jobStats.push(folderStats); - total_documents += folderStats.document_count; - total_size_bytes += folderStats.total_size_bytes; - // Aggregate file type stats - for (const [ext, count] of Object.entries(folderStats.file_type_stats)) { - aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count; - } - } - } - - const analysis: JobsDirectoryAnalysis = { - bodyshopid, - total_jobs: jobStats.length, - total_documents, - total_size_bytes, - total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100, - file_type_stats: aggregated_file_type_stats, - media_analytics_details: { data: jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0) } - }; - - logger.info( - `Jobs directory analysis complete: ${analysis.total_jobs} jobs, ${analysis.total_documents} documents, ${analysis.total_size_mb} MB` - ); - return analysis; - } catch (error) { - logger.error("Failed to analyze Jobs directory:", error); - throw error; - } - } - - /** - * Analyze a single job folder - */ - private async analyzeJobFolder(jobsPath: string, jobid: string): Promise { - const jobFolderPath = path.join(jobsPath, jobid); - // const relativePath = path.relative(FolderPaths.Root, jobFolderPath); - - const { document_count, total_size_bytes, file_type_stats } = await this.getDirectoryStats(jobFolderPath); - - const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; - let validJobid: UUID | string | null = null; - if (jobid === "temporary") { - validJobid = null; - } else if (uuidRegex.test(jobid)) { - validJobid = jobid as UUID; - } else { - logger.warn(`Invalid jobid encountered in analyzeJobFolder: ${jobid}`); - validJobid = null; - } - return { - jobid: validJobid, - bodyshopid, - //relativePath, - document_count, - total_size_bytes, - total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100, - file_type_stats - }; - } - - /** - * Recursively get document count and total size for a directory - */ - private async getDirectoryStats( - dirPath: string - ): Promise<{ document_count: number; total_size_bytes: number; file_type_stats: { [extension: string]: number } }> { - let document_count = 0; - let total_size_bytes = 0; - const file_type_stats: { [extension: string]: number } = {}; - - try { - const items = await readdir(dirPath); - - for (const item of items) { - const itemPath = path.join(dirPath, item); - const stat = await fsStat(itemPath); - - if (stat.isDirectory()) { - // Skip thumbs and ConvertedOriginals folders (case-insensitive) - const itemLower = item.toLowerCase(); - if (itemLower === "thumbs" || itemLower === "convertedoriginals") { - continue; - } - - // Recursively analyze subdirectories - const subStats = await this.getDirectoryStats(itemPath); - document_count += subStats.document_count; - total_size_bytes += subStats.total_size_bytes; - - // Merge file type stats - for (const [ext, count] of Object.entries(subStats.file_type_stats)) { - file_type_stats[ext] = (file_type_stats[ext] || 0) + count; - } - } else { - // Count files as documents - document_count++; - total_size_bytes += stat.size; - - // Track file extension - const ext = path.extname(item).toLowerCase() || "no-extension"; - file_type_stats[ext] = (file_type_stats[ext] || 0) + 1; - } - } - } catch (error) { - logger.error(`Error analyzing directory ${dirPath}:`, error); - } - - return { document_count, total_size_bytes, file_type_stats }; - } } /** @@ -363,26 +212,26 @@ export async function analyzeJobsDirectory(): Promise { let total_size_bytes = 0; const aggregated_file_type_stats: { [extension: string]: number } = {}; - const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; - for (const jobFolder of jobFolders) { - if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) { - logger.warn(`Skipping invalid jobid directory: ${jobFolder}`); - continue; - } - const jobFolderPath = path.join(jobsPath, jobFolder); - const stat = await fsStat(jobFolderPath); - // Only process directories - if (stat.isDirectory()) { - const folderStats = await analyzeJobFolder(jobsPath, jobFolder); - jobStats.push(folderStats); - total_documents += folderStats.document_count; - total_size_bytes += folderStats.total_size_bytes; - // Aggregate file type stats - for (const [ext, count] of Object.entries(folderStats.file_type_stats)) { - aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count; - } + const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; + for (const jobFolder of jobFolders) { + if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) { + logger.info(`Skipping invalid jobid directory: ${jobFolder}`); + continue; + } + const jobFolderPath = path.join(jobsPath, jobFolder); + const stat = await fsStat(jobFolderPath); + // Only process directories + if (stat.isDirectory()) { + const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid); + jobStats.push(folderStats); + total_documents += folderStats.document_count; + total_size_bytes += folderStats.total_size_bytes; + // Aggregate file type stats + for (const [ext, count] of Object.entries(folderStats.file_type_stats)) { + aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count; } } + } const analysis: JobsDirectoryAnalysis = { bodyshopid, //read from the config.json file in the root directory @@ -400,7 +249,7 @@ export async function analyzeJobsDirectory(): Promise { ); //Add an upload to the IO database to categorize all of this. - const apiURL = "https://api.test.imex.online/analytics/documents"; //TODO: don't hardcode and point to prod. + const apiURL = "https://api.test.imex.online/analytics/documents"; //TODO: don't hardcode and point to prod. const result = await axios.post(apiURL, { data: analysis }); return analysis; @@ -411,11 +260,10 @@ export async function analyzeJobsDirectory(): Promise { } /** - * Analyze a single job folder (standalone helper function) + * Analyze a single job folder (helper function) */ -async function analyzeJobFolder(jobsPath: string, jobid: string): Promise { +async function analyzeJobFolder(jobsPath: string, jobid: string, bodyshopid: UUID): Promise { const jobFolderPath = path.join(jobsPath, jobid); - const relativePath = path.relative(FolderPaths.Root, jobFolderPath); const { document_count, total_size_bytes, file_type_stats } = await getDirectoryStats(jobFolderPath); @@ -432,7 +280,6 @@ async function analyzeJobFolder(jobsPath: string, jobid: string): Promise