From cc5fea9410bee6a27e5146fa2d661a83ead02748 Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Thu, 20 Feb 2025 12:21:09 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications - Checkpoint, finished testing queue, adjusted timeouts to be pegged to one variable. --- server/notifications/queues/appQueue.js | 56 +++++++++++++---------- server/notifications/queues/emailQueue.js | 33 ++++++++----- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index 6cb209263..04c6e5fcd 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -1,5 +1,14 @@ const { Queue, Worker } = require("bullmq"); +// Base time-related constant (in milliseconds) +const CONSOLIDATION_DELAY = 60000; // 1 minute (base timeout) + +// Derived time-related constants based on CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to CONSOLIDATION_DELAY +const NOTIFICATION_STORAGE_EXPIRATION = CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, for notification storage) +const CONSOLIDATION_FLAG_EXPIRATION = CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation flag) +const LOCK_EXPIRATION = CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration) +const RATE_LIMITER_DURATION = CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting) + let addQueue; let consolidateQueue; @@ -21,8 +30,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { // Create queue for adding notifications addQueue = new Queue("notificationsAdd", { connection: pubClient, - prefix: "{BULLMQ}", // Namespace prefix for BullMQ in Redis - defaultJobOptions: { removeOnComplete: true, removeOnFail: true } // Cleanup jobs after success/failure + prefix: "{BULLMQ}", + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Create queue for consolidating notifications @@ -49,26 +58,30 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const existingNotifications = await pubClient.get(userKey); const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); - // Set with 40-second expiration to avoid stale data - await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40); + // Set with expiration to avoid stale data + await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); // Convert to seconds logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } const consolidateKey = `app:consolidate:${jobId}`; - // setnx ensures only one consolidation job is scheduled (atomic operation) const flagSet = await pubClient.setnx(consolidateKey, "pending"); logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { - // Schedule consolidation job to run after a 5-second delay + // Schedule consolidation job with delay and retries await consolidateQueue.add( "consolidate-notifications", { jobId, recipients }, - { jobId: `consolidate:${jobId}`, delay: 5000 } + { + jobId: `consolidate:${jobId}`, + delay: CONSOLIDATION_DELAY, + attempts: 3, // Retry up to 3 times + backoff: LOCK_EXPIRATION // Retry delay matches lock expiration (15s) + } ); logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); - // Set expiration on flag to clean up after 5 minutes - await pubClient.expire(consolidateKey, 300); + // Set expiration on flag + await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); // Convert to seconds } else { logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); } @@ -76,7 +89,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { { connection: pubClient, prefix: "{BULLMQ}", - concurrency: 5 // Process up to 5 jobs concurrently + concurrency: 5 } ); @@ -89,14 +102,13 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const redisKeyPrefix = `app:notifications:${jobId}`; const lockKey = `lock:consolidate:${jobId}`; - // Acquire a lock to prevent concurrent consolidation (NX = set if not exists) - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); + // Acquire a lock to prevent concurrent consolidation + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); // Convert to seconds logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { try { const allNotifications = {}; - // Get unique user IDs to avoid duplicate processing const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); @@ -113,7 +125,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { allNotifications[user] = allNotifications[user] || {}; allNotifications[user][bodyShopId] = parsedNotifications; } - await pubClient.del(userKey); // Clean up after retrieval + await pubClient.del(userKey); logger.logger.debug(`Deleted Redis key ${userKey}`); } else { logger.logger.warn(`No notifications found for ${user} under ${userKey}`); @@ -152,13 +164,12 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } } - // Clean up consolidation flag after processing await pubClient.del(`app:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); - throw err; // Re-throw to trigger BullMQ's failed event + throw err; // Trigger retry if attempts remain } finally { - await pubClient.del(lockKey); // Release lock regardless of success/failure + await pubClient.del(lockKey); } } else { logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); @@ -167,8 +178,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { { connection: pubClient, prefix: "{BULLMQ}", - concurrency: 1, // Single concurrency to avoid race conditions - limiter: { max: 1, duration: 5000 } // Rate limit: 1 job every 5 seconds + concurrency: 1, + limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); @@ -192,11 +203,11 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { logger.logger.info("App queue workers closed"); }; - process.on("SIGTERM", shutdown); // Handle termination signal - process.on("SIGINT", shutdown); // Handle interrupt signal (e.g., Ctrl+C) + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); } - return addQueue; // Return queue for external use + return addQueue; }; /** @@ -223,7 +234,6 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients } = app; - // Unique jobId with timestamp to avoid duplicates await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients }, diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index 516991b1b..ba850a6fc 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -1,6 +1,15 @@ const { Queue, Worker } = require("bullmq"); const { sendTaskEmail } = require("../../email/sendemail"); +// Base time-related constant (in milliseconds) +const EMAIL_CONSOLIDATION_DELAY = 60000; // 1 minute (base timeout) + +// Derived time-related constants based on EMAIL_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to EMAIL_CONSOLIDATION_DELAY +const CONSOLIDATION_KEY_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation) +const LOCK_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration) +const RATE_LIMITER_DURATION = EMAIL_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting) +const NOTIFICATION_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (matches consolidation key expiration) + let emailAddQueue; let emailConsolidateQueue; let emailAddWorker; @@ -36,7 +45,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { emailAddWorker = new Worker( "emailAdd", async (job) => { - const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; // Receive bodyShopName + const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; logger.logger.info(`Adding email notifications for jobId ${jobId}`); const redisKeyPrefix = `email:notifications:${jobId}`; @@ -44,9 +53,11 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const { user } = recipient; const userKey = `${redisKeyPrefix}:${user}`; await pubClient.rpush(userKey, body); + await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); // Set expiration const detailsKey = `email:recipientDetails:${jobId}:${user}`; await pubClient.hsetnx(detailsKey, "firstName", recipient.firstName || ""); await pubClient.hsetnx(detailsKey, "lastName", recipient.lastName || ""); + await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); // Set expiration await pubClient.sadd(`email:recipients:${jobId}`, user); logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`); } @@ -54,14 +65,18 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const consolidateKey = `email:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); if (flagSet) { - // Pass bodyShopName to the consolidation job await emailConsolidateQueue.add( "consolidate-emails", { jobId, jobRoNumber, bodyShopName }, - { jobId: `consolidate:${jobId}`, delay: 30000 } + { + jobId: `consolidate:${jobId}`, + delay: EMAIL_CONSOLIDATION_DELAY, + attempts: 3, // Retry up to 3 times + backoff: LOCK_EXPIRATION // Retry delay matches lock expiration (15s) + } ); logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`); - await pubClient.expire(consolidateKey, 300); + await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); // Convert to seconds } else { logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`); } @@ -81,7 +96,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { logger.logger.info(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:emailConsolidate:${jobId}`; - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); // Convert to seconds if (lockAcquired) { try { const recipientsSet = `email:recipients:${jobId}`; @@ -127,7 +142,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { await pubClient.del(`email:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); - throw err; + throw err; // Trigger retry if attempts remain } finally { await pubClient.del(lockKey); } @@ -139,7 +154,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { connection: pubClient, prefix: "{BULLMQ}", concurrency: 1, - limiter: { max: 1, duration: 5000 } + limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); @@ -188,14 +203,11 @@ const getQueue = () => { * @returns {Promise} Resolves when all notifications are added to the queue. */ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { - console.dir(emailsToDispatch); const emailAddQueue = getQueue(); for (const email of emailsToDispatch) { - // Extract bodyShopName along with other fields const { jobId, jobRoNumber, bodyShopName, body, recipients } = email; - // Validate required fields, including bodyShopName if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) { logger.logger.warn( `Skipping email dispatch for jobId ${jobId} due to missing data: ` + @@ -204,7 +216,6 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { continue; } - // Include bodyShopName in the job data await emailAddQueue.add( "add-email-notification", { jobId, jobRoNumber, bodyShopName, body, recipients },