Move queue to redis to prevent multiple firings.

This commit is contained in:
Patrick Fic
2025-11-10 11:43:05 -08:00
parent f575f6ab9a
commit f6a9486284
3 changed files with 213 additions and 259 deletions

View File

@@ -191,16 +191,16 @@ app.get("/sync/status", ValidateImsToken, async (req, res) => {
}); });
// Manual S3 sync trigger endpoint (for testing) // Manual S3 sync trigger endpoint (for testing)
app.post("/sync/trigger", ValidateImsToken, async (req, res) => { // app.post("/sync/trigger", ValidateImsToken, async (req, res) => {
try { // try {
await dailyS3Scheduler.triggerManualSync(); // await dailyS3Scheduler.triggerManualSync();
res.json({ success: true, message: "Manual sync triggered successfully" }); // res.json({ success: true, message: "Manual sync triggered successfully" });
} catch (error) { // } catch (error) {
logger.error("Manua--l sync failed:", error); // logger.error("Manua--l sync failed:", error);
const errorMessage = error instanceof Error ? error.message : "Unknown error"; // const errorMessage = error instanceof Error ? error.message : "Unknown error";
res.status(500).json({ success: false, message: "Manual sync failed", error: errorMessage }); // res.status(500).json({ success: false, message: "Manual sync failed", error: errorMessage });
} // }
}); // });
// Jobs directory analysis endpoint // Jobs directory analysis endpoint
app.get("/jobs/analysis", ValidateImsToken, async (req, res) => { app.get("/jobs/analysis", ValidateImsToken, async (req, res) => {
@@ -220,9 +220,14 @@ app.use(FolderPaths.StaticPath, express.static(FolderPaths.Root, { etag: false,
app.use("/assets", express.static("/assets", { etag: false, maxAge: 30 * 1000 })); app.use("/assets", express.static("/assets", { etag: false, maxAge: 30 * 1000 }));
// Start the daily S3 sync scheduler // Start the daily S3 sync scheduler
dailyS3Scheduler.start().catch((error) => { dailyS3Scheduler
logger.error("Failed to start sync scheduler:", error); .start()
}); .then(() => {
logger.info("Sync scheduler started successfully.");
})
.catch((error) => {
logger.error("Failed to start sync scheduler:", error);
});
app.listen(port, () => { app.listen(port, () => {
logger.info(`ImEX Media Server is running at http://localhost:${port}`); logger.info(`ImEX Media Server is running at http://localhost:${port}`);

View File

@@ -1,60 +1,172 @@
import * as cron from "node-cron"; import { Queue, Worker, QueueEvents } from "bullmq";
import { logger } from "../server.js"; import { logger } from "../server.js";
import { S3Sync, createS3SyncFromEnv } from "./s3Sync.js"; import { S3Sync, createS3SyncFromEnv, analyzeJobsDirectory } from "./s3Sync.js";
const SCHEDULER_QUEUE_NAME = "scheduledTasksQueue";
const connectionOpts = {
host: "localhost",
port: 6379,
enableReadyCheck: true,
reconnectOnError: (err: Error) => err.message.includes("READONLY")
};
export interface ScheduledTaskInfo {
name: string;
nextRun: Date | null;
status: string;
}
export class DailyS3Scheduler { export class DailyS3Scheduler {
private s3Sync: S3Sync | null = null; private s3Sync: S3Sync | null = null;
private cronJob: cron.ScheduledTask | null = null; private schedulerQueue: Queue | null = null;
private schedulerWorker: Worker | null = null;
private queueEvents: QueueEvents | null = null;
private isInitialized: boolean = false;
constructor() { constructor() {
this.s3Sync = createS3SyncFromEnv(); this.s3Sync = createS3SyncFromEnv();
} }
/** /**
* Start the daily S3 sync scheduler * Start the daily S3 sync scheduler using BullMQ
* Runs at midnight PST (00:00 PST = 08:00 UTC during standard time, 07:00 UTC during daylight time) * This ensures only one worker executes the job across all cluster instances
*/ */
async start(): Promise<void> { async start(): Promise<void> {
if (!this.s3Sync) { if (this.isInitialized) {
logger.warn("S3 sync not configured. Skipping scheduler setup."); logger.warn("Scheduler already initialized");
return; return;
} }
// Test S3 connection before starting scheduler try {
// const connectionTest = await this.s3Sync.testConnection(); // Initialize the queue
// if (!connectionTest) { this.schedulerQueue = new Queue(SCHEDULER_QUEUE_NAME, {
// logger.error("S3 connection test failed. S3 sync scheduler will not be started."); connection: connectionOpts,
// return; defaultJobOptions: {
// } removeOnComplete: 10, // Keep last 10 completed jobs
removeOnFail: 50, // Keep last 50 failed jobs
attempts: 3,
backoff: { type: "exponential", delay: 2000 }
}
});
// Cron expression for midnight PST // Initialize queue events for monitoring
// Note: This uses PST timezone. During PDT (daylight time), it will still run at midnight local time this.queueEvents = new QueueEvents(SCHEDULER_QUEUE_NAME, {
const cronExpression = "0 6 * * *"; // Every day at midnight connection: connectionOpts
const timezone = "America/Los_Angeles"; // PST/PDT timezone });
this.cronJob = cron.schedule( // Create worker to process scheduled jobs
cronExpression, this.schedulerWorker = new Worker(
async () => { SCHEDULER_QUEUE_NAME,
//await this.performDailySync(); async (job) => {
await this.triggerJobAnalysis(); logger.info(`[Scheduler] Processing job: ${job.name} (ID: ${job.id})`);
},
{ try {
timezone: timezone if (job.name === "daily-s3-sync") {
} await this.performDailySync();
); } else if (job.name === "daily-analytics") {
await this.triggerJobAnalysis();
}
} catch (error) {
logger.error(`[Scheduler] Job ${job.name} failed:`, error);
throw error; // Re-throw to trigger retry
}
},
{
connection: connectionOpts,
concurrency: 1, // Process one job at a time
autorun: true
}
);
logger.info(`Daily scheduler started. Will run at midnight PST/PDT.`); // Set up worker event handlers
logger.info(`Next sync scheduled for: ${this.getNextRunTime()}`); this.schedulerWorker.on("completed", (job) => {
logger.info(`[Scheduler] Job ${job.name} completed successfully (ID: ${job.id})`);
});
this.schedulerWorker.on("failed", (job, err) => {
logger.error(`[Scheduler] Job ${job?.name} failed (ID: ${job?.id}):`, err);
});
this.schedulerWorker.on("error", (err) => {
logger.error("[Scheduler] Worker error:", err);
});
// Add repeatable jobs (cron-like scheduling)
// Only one instance across all workers will execute these at the scheduled time
// Daily S3 sync at 1 AM PST
await this.schedulerQueue.add(
"daily-s3-sync",
{},
{
repeat: {
pattern: "0 1 * * *", // Every day at 1 AM
tz: "America/Los_Angeles" // PST/PDT timezone
},
jobId: "daily-s3-sync" // Use consistent ID to prevent duplicates
}
);
// Analytics job - every minute (adjust as needed)
await this.schedulerQueue.add(
"daily-analytics",
{},
{
repeat: {
pattern: "21 * * * *", //9PM
tz: "America/Los_Angeles"
},
jobId: "daily-analytics"
}
);
this.isInitialized = true;
logger.info(`[Scheduler] Daily scheduler started using BullMQ (Worker ${process.env.NODE_APP_INSTANCE || "N/A"})`);
// Log next scheduled runs
const status = await this.getStatus();
status.nextRun.forEach(task => {
logger.info(`[Scheduler] ${task.name} - Next run: ${task.nextRun}`);
});
} catch (error) {
logger.error("[Scheduler] Failed to start scheduler:", error);
throw error;
}
} }
/** /**
* Stop the scheduler * Stop the scheduler and clean up resources
*/ */
stop(): void { async stop(): Promise<void> {
if (this.cronJob) { try {
this.cronJob.stop(); if (this.schedulerWorker) {
this.cronJob = null; await this.schedulerWorker.close();
logger.info("Daily S3 sync scheduler stopped."); this.schedulerWorker = null;
logger.info("[Scheduler] Worker stopped");
}
if (this.queueEvents) {
await this.queueEvents.close();
this.queueEvents = null;
}
if (this.schedulerQueue) {
// Remove all repeatable jobs
const repeatableJobs = await this.schedulerQueue.getRepeatableJobs();
for (const job of repeatableJobs) {
await this.schedulerQueue.removeRepeatableByKey(job.key);
}
await this.schedulerQueue.close();
this.schedulerQueue = null;
logger.info("[Scheduler] Queue stopped and repeatable jobs removed");
}
this.isInitialized = false;
} catch (error) {
logger.error("[Scheduler] Error stopping scheduler:", error);
throw error;
} }
} }
@@ -94,49 +206,37 @@ export class DailyS3Scheduler {
} }
async triggerJobAnalysis(): Promise<void> { async triggerJobAnalysis(): Promise<void> {
if (!this.s3Sync) {
logger.error("S3 sync not configured");
return;
}
logger.info("Triggering jobs directory analysis..."); logger.info("Triggering jobs directory analysis...");
try { try {
const analysis = await this.s3Sync.analyzeJobsDirectory(); const analysis = await analyzeJobsDirectory();
logger.info("Jobs directory analysis completed:", analysis); logger.info("Jobs directory analysis completed:", analysis);
} catch (error) { } catch (error) {
logger.error("Jobs directory analysis failed:", error); logger.error("Jobs directory analysis failed:", error);
//TODO: implement a logger here to send an email or something.
} }
} }
/** /**
* Get the next scheduled run time * Get the next scheduled run time for all repeatable jobs
*/ */
private getNextRunTime(): string { private async getNextRunTime(): Promise<ScheduledTaskInfo[]> {
if (!this.cronJob) { if (!this.schedulerQueue) {
return "Not scheduled"; return [];
} }
// Create a date object for midnight PST today try {
const now = new Date(); const repeatableJobs = await this.schedulerQueue.getRepeatableJobs();
const pstNow = new Date(now.toLocaleString("en-US", { timeZone: "America/Los_Angeles" }));
return repeatableJobs.map(job => ({
// If it's past midnight today, next run is tomorrow at midnight name: job.name || job.id || "unknown",
const nextRun = new Date(pstNow); nextRun: job.next ? new Date(job.next) : null,
if (pstNow.getHours() > 0 || pstNow.getMinutes() > 0 || pstNow.getSeconds() > 0) { status: "scheduled"
nextRun.setDate(nextRun.getDate() + 1); }));
} catch (error) {
logger.error("[Scheduler] Failed to get next run times:", error);
return [];
} }
nextRun.setHours(0, 0, 0, 0);
return nextRun.toLocaleString("en-US", {
timeZone: "America/Los_Angeles",
weekday: "long",
year: "numeric",
month: "long",
day: "numeric",
hour: "2-digit",
minute: "2-digit",
timeZoneName: "short"
});
} }
/** /**
@@ -145,7 +245,7 @@ export class DailyS3Scheduler {
async getStatus(): Promise<{ async getStatus(): Promise<{
isConfigured: boolean; isConfigured: boolean;
isRunning: boolean; isRunning: boolean;
nextRun: string; nextRun: ScheduledTaskInfo[];
syncStats?: { bucketName: string; region: string; keyPrefix: string; available: boolean }; syncStats?: { bucketName: string; region: string; keyPrefix: string; available: boolean };
}> { }> {
let syncStats; let syncStats;
@@ -157,10 +257,12 @@ export class DailyS3Scheduler {
} }
} }
const nextRun = await this.getNextRunTime();
return { return {
isConfigured: this.s3Sync !== null, isConfigured: this.s3Sync !== null,
isRunning: this.cronJob !== null, isRunning: this.isInitialized && this.schedulerWorker !== null,
nextRun: this.getNextRunTime(), nextRun,
syncStats syncStats
}; };
} }

View File

@@ -60,7 +60,7 @@ export class S3Sync {
// Check if Jobs directory exists // Check if Jobs directory exists
if (!(await fs.pathExists(jobsPath))) { if (!(await fs.pathExists(jobsPath))) {
logger.warn(`Jobs directory does not exist: ${jobsPath}`); logger.error(`Jobs directory does not exist: ${jobsPath}`);
return; return;
} }
@@ -145,7 +145,7 @@ export class S3Sync {
* Get sync statistics using AWS CLI * Get sync statistics using AWS CLI
*/ */
async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> { async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> {
const available = await this.testConnection(); const available = false; //await this.testConnection();
return { return {
bucketName: this.config.bucketName, bucketName: this.config.bucketName,
region: this.config.region, region: this.config.region,
@@ -153,157 +153,6 @@ export class S3Sync {
available available
}; };
} }
/**
* Analyze all job folders in the Jobs directory
* Returns detailed statistics for each job folder
*/
async analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
try {
logger.info("Starting Jobs directory analysis...");
const jobsPath = FolderPaths.Jobs;
// Check if Jobs directory exists
if (!(await fs.pathExists(jobsPath))) {
logger.warn(`Jobs directory does not exist: ${jobsPath}`);
return {
bodyshopid,
total_jobs: 0,
total_documents: 0,
total_size_bytes: 0,
total_size_mb: 0,
file_type_stats: {},
media_analytics_details: { data: [] }
};
}
const jobFolders = await readdir(jobsPath);
const jobStats: JobFolderStats[] = [];
let total_documents = 0;
let total_size_bytes = 0;
const aggregated_file_type_stats: { [extension: string]: number } = {};
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
for (const jobFolder of jobFolders) {
if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) {
logger.warn(`Skipping invalid jobid directory: ${jobFolder}`);
continue;
}
const jobFolderPath = path.join(jobsPath, jobFolder);
const stat = await fsStat(jobFolderPath);
// Only process directories
if (stat.isDirectory()) {
const folderStats = await this.analyzeJobFolder(jobsPath, jobFolder);
jobStats.push(folderStats);
total_documents += folderStats.document_count;
total_size_bytes += folderStats.total_size_bytes;
// Aggregate file type stats
for (const [ext, count] of Object.entries(folderStats.file_type_stats)) {
aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count;
}
}
}
const analysis: JobsDirectoryAnalysis = {
bodyshopid,
total_jobs: jobStats.length,
total_documents,
total_size_bytes,
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
file_type_stats: aggregated_file_type_stats,
media_analytics_details: { data: jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0) }
};
logger.info(
`Jobs directory analysis complete: ${analysis.total_jobs} jobs, ${analysis.total_documents} documents, ${analysis.total_size_mb} MB`
);
return analysis;
} catch (error) {
logger.error("Failed to analyze Jobs directory:", error);
throw error;
}
}
/**
* Analyze a single job folder
*/
private async analyzeJobFolder(jobsPath: string, jobid: string): Promise<JobFolderStats> {
const jobFolderPath = path.join(jobsPath, jobid);
// const relativePath = path.relative(FolderPaths.Root, jobFolderPath);
const { document_count, total_size_bytes, file_type_stats } = await this.getDirectoryStats(jobFolderPath);
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
let validJobid: UUID | string | null = null;
if (jobid === "temporary") {
validJobid = null;
} else if (uuidRegex.test(jobid)) {
validJobid = jobid as UUID;
} else {
logger.warn(`Invalid jobid encountered in analyzeJobFolder: ${jobid}`);
validJobid = null;
}
return {
jobid: validJobid,
bodyshopid,
//relativePath,
document_count,
total_size_bytes,
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
file_type_stats
};
}
/**
* Recursively get document count and total size for a directory
*/
private async getDirectoryStats(
dirPath: string
): Promise<{ document_count: number; total_size_bytes: number; file_type_stats: { [extension: string]: number } }> {
let document_count = 0;
let total_size_bytes = 0;
const file_type_stats: { [extension: string]: number } = {};
try {
const items = await readdir(dirPath);
for (const item of items) {
const itemPath = path.join(dirPath, item);
const stat = await fsStat(itemPath);
if (stat.isDirectory()) {
// Skip thumbs and ConvertedOriginals folders (case-insensitive)
const itemLower = item.toLowerCase();
if (itemLower === "thumbs" || itemLower === "convertedoriginals") {
continue;
}
// Recursively analyze subdirectories
const subStats = await this.getDirectoryStats(itemPath);
document_count += subStats.document_count;
total_size_bytes += subStats.total_size_bytes;
// Merge file type stats
for (const [ext, count] of Object.entries(subStats.file_type_stats)) {
file_type_stats[ext] = (file_type_stats[ext] || 0) + count;
}
} else {
// Count files as documents
document_count++;
total_size_bytes += stat.size;
// Track file extension
const ext = path.extname(item).toLowerCase() || "no-extension";
file_type_stats[ext] = (file_type_stats[ext] || 0) + 1;
}
}
} catch (error) {
logger.error(`Error analyzing directory ${dirPath}:`, error);
}
return { document_count, total_size_bytes, file_type_stats };
}
} }
/** /**
@@ -363,26 +212,26 @@ export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
let total_size_bytes = 0; let total_size_bytes = 0;
const aggregated_file_type_stats: { [extension: string]: number } = {}; const aggregated_file_type_stats: { [extension: string]: number } = {};
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
for (const jobFolder of jobFolders) { for (const jobFolder of jobFolders) {
if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) { if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) {
logger.warn(`Skipping invalid jobid directory: ${jobFolder}`); logger.info(`Skipping invalid jobid directory: ${jobFolder}`);
continue; continue;
} }
const jobFolderPath = path.join(jobsPath, jobFolder); const jobFolderPath = path.join(jobsPath, jobFolder);
const stat = await fsStat(jobFolderPath); const stat = await fsStat(jobFolderPath);
// Only process directories // Only process directories
if (stat.isDirectory()) { if (stat.isDirectory()) {
const folderStats = await analyzeJobFolder(jobsPath, jobFolder); const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid);
jobStats.push(folderStats); jobStats.push(folderStats);
total_documents += folderStats.document_count; total_documents += folderStats.document_count;
total_size_bytes += folderStats.total_size_bytes; total_size_bytes += folderStats.total_size_bytes;
// Aggregate file type stats // Aggregate file type stats
for (const [ext, count] of Object.entries(folderStats.file_type_stats)) { for (const [ext, count] of Object.entries(folderStats.file_type_stats)) {
aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count; aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count;
}
} }
} }
}
const analysis: JobsDirectoryAnalysis = { const analysis: JobsDirectoryAnalysis = {
bodyshopid, //read from the config.json file in the root directory bodyshopid, //read from the config.json file in the root directory
@@ -400,7 +249,7 @@ export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
); );
//Add an upload to the IO database to categorize all of this. //Add an upload to the IO database to categorize all of this.
const apiURL = "https://api.test.imex.online/analytics/documents"; //TODO: don't hardcode and point to prod. const apiURL = "https://api.test.imex.online/analytics/documents"; //TODO: don't hardcode and point to prod.
const result = await axios.post(apiURL, { data: analysis }); const result = await axios.post(apiURL, { data: analysis });
return analysis; return analysis;
@@ -411,11 +260,10 @@ export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
} }
/** /**
* Analyze a single job folder (standalone helper function) * Analyze a single job folder (helper function)
*/ */
async function analyzeJobFolder(jobsPath: string, jobid: string): Promise<JobFolderStats> { async function analyzeJobFolder(jobsPath: string, jobid: string, bodyshopid: UUID): Promise<JobFolderStats> {
const jobFolderPath = path.join(jobsPath, jobid); const jobFolderPath = path.join(jobsPath, jobid);
const relativePath = path.relative(FolderPaths.Root, jobFolderPath);
const { document_count, total_size_bytes, file_type_stats } = await getDirectoryStats(jobFolderPath); const { document_count, total_size_bytes, file_type_stats } = await getDirectoryStats(jobFolderPath);
@@ -432,7 +280,6 @@ async function analyzeJobFolder(jobsPath: string, jobid: string): Promise<JobFol
return { return {
bodyshopid, bodyshopid,
jobid: validJobid, jobid: validJobid,
// relativePath,
document_count, document_count,
total_size_bytes, total_size_bytes,
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100, total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
@@ -441,7 +288,7 @@ async function analyzeJobFolder(jobsPath: string, jobid: string): Promise<JobFol
} }
/** /**
* Recursively get document count and total size for a directory (standalone helper function) * Recursively get document count and total size for a directory (helper function)
*/ */
async function getDirectoryStats( async function getDirectoryStats(
dirPath: string dirPath: string
@@ -463,7 +310,7 @@ async function getDirectoryStats(
if (itemLower === "thumbs" || itemLower === "convertedoriginals") { if (itemLower === "thumbs" || itemLower === "convertedoriginals") {
continue; continue;
} }
// Recursively analyze subdirectories // Recursively analyze subdirectories
const subStats = await getDirectoryStats(itemPath); const subStats = await getDirectoryStats(itemPath);
document_count += subStats.document_count; document_count += subStats.document_count;