const { Queue, Worker } = require("bullmq"); // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 const APP_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; return isNaN(parsedValue) ? 1 : Math.max(1, parsedValue); // Default to 1, ensure at least 1 })(); // Base time-related constant (in milliseconds) / DO NOT TOUCH const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout) // Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to APP_CONSOLIDATION_DELAY const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, for notification storage) const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation flag) const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration) const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting) let addQueue; let consolidateQueue; /** * Initializes the notification queues and workers for adding and consolidating notifications. * * @param {Object} options - Configuration options for queue initialization. * @param {Object} options.pubClient - Redis client instance for queue communication. * @param {Object} options.logger - Logger instance for logging events and debugging. * @param {Object} options.redisHelpers - Utility functions for Redis operations. * @param {Object} options.ioRedis - Socket.io Redis adapter for real-time event emission. * @returns {Queue} The initialized `addQueue` instance for dispatching notifications. */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { // Only initialize if queues don't already exist if (!addQueue || !consolidateQueue) { logger.logger.info("Initializing Notifications Queues"); // Create queue for adding notifications addQueue = new Queue("notificationsAdd", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Create queue for consolidating notifications consolidateQueue = new Queue("notificationsConsolidate", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Worker to process jobs from the addQueue const addWorker = new Worker( "notificationsAdd", async (job) => { const { jobId, key, variables, recipients } = job.data; logger.logger.info(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const notification = { key, variables, timestamp: Date.now() }; // Store notification for each recipient in Redis for (const recipient of recipients) { const { user } = recipient; const userKey = `${redisKeyPrefix}:${user}`; const existingNotifications = await pubClient.get(userKey); const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); // Set with expiration to avoid stale data await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); // Convert to seconds logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } const consolidateKey = `app:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { // Schedule consolidation job with delay and retries await consolidateQueue.add( "consolidate-notifications", { jobId, recipients }, { jobId: `consolidate:${jobId}`, delay: APP_CONSOLIDATION_DELAY, attempts: 3, // Retry up to 3 times backoff: LOCK_EXPIRATION // Retry delay matches lock expiration (15s) } ); logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); // Set expiration on flag await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); // Convert to seconds } else { logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); } }, { connection: pubClient, prefix: "{BULLMQ}", concurrency: 5 } ); // Worker to process jobs from the consolidateQueue const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { const { jobId, recipients } = job.data; logger.logger.info(`Consolidating notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const lockKey = `lock:consolidate:${jobId}`; // Acquire a lock to prevent concurrent consolidation const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); // Convert to seconds logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { try { const allNotifications = {}; const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); // Retrieve and structure notifications by user and bodyShopId for (const user of uniqueUsers) { const userKey = `${redisKeyPrefix}:${user}`; const notifications = await pubClient.get(userKey); logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`); if (notifications) { const parsedNotifications = JSON.parse(notifications); const userRecipients = recipients.filter((r) => r.user === user); for (const { bodyShopId } of userRecipients) { allNotifications[user] = allNotifications[user] || {}; allNotifications[user][bodyShopId] = parsedNotifications; } await pubClient.del(userKey); logger.logger.debug(`Deleted Redis key ${userKey}`); } else { logger.logger.warn(`No notifications found for ${user} under ${userKey}`); } } logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); // Emit notifications to users via Socket.io for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userMapping = await redisHelpers.getUserSocketMapping(user); logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`); for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { if (userMapping && userMapping[bodyShopId]?.socketIds) { userMapping[bodyShopId].socketIds.forEach((socketId) => { logger.logger.debug( `Emitting to socket ${socketId}: ${JSON.stringify({ jobId, bodyShopId, notifications })}` ); ioRedis.to(socketId).emit("notification", { jobId, bodyShopId, notifications }); }); logger.logger.info( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId}` ); } else { logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); } } } await pubClient.del(`app:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); throw err; // Trigger retry if attempts remain } finally { await pubClient.del(lockKey); } } else { logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } }, { connection: pubClient, prefix: "{BULLMQ}", concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); // Log worker completion events addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`)); consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`)); // Log worker failure events with error details addWorker.on("failed", (job, err) => logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err }) ); consolidateWorker.on("failed", (job, err) => logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) ); // Graceful shutdown handler for workers const shutdown = async () => { logger.logger.info("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); logger.logger.info("App queue workers closed"); }; process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); } return addQueue; }; /** * Retrieves the initialized `addQueue` instance. * * @returns {Queue} The `addQueue` instance for adding notifications. * @throws {Error} If `addQueue` is not initialized (i.e., `loadAppQueue` wasn’t called). */ const getQueue = () => { if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap."); return addQueue; }; /** * Dispatches notifications to the `addQueue` for processing. * * @param {Object} options - Options for dispatching notifications. * @param {Array} options.appsToDispatch - Array of notification objects to dispatch. * @param {Object} options.logger - Logger instance for logging dispatch events. * @returns {Promise} Resolves when all notifications are added to the queue. */ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { const appQueue = getQueue(); for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients } = app; await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients }, { jobId: `${jobId}:${Date.now()}` } ); logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };