const { Queue, Worker } = require("bullmq"); let addQueue; let consolidateQueue; /** * Initializes the notification queues and workers for adding and consolidating notifications. * * @param {Object} options - Configuration options for queue initialization. * @param {Object} options.pubClient - Redis client instance for queue communication. * @param {Object} options.logger - Logger instance for logging events and debugging. * @param {Object} options.redisHelpers - Utility functions for Redis operations. * @param {Object} options.ioRedis - Socket.io Redis adapter for real-time event emission. * @returns {Queue} The initialized `addQueue` instance for dispatching notifications. */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { // Only initialize if queues don't already exist if (!addQueue || !consolidateQueue) { logger.logger.info("Initializing Notifications Queues"); // Create queue for adding notifications addQueue = new Queue("notificationsAdd", { connection: pubClient, prefix: "{BULLMQ}", // Namespace prefix for BullMQ in Redis defaultJobOptions: { removeOnComplete: true, removeOnFail: true } // Cleanup jobs after success/failure }); // Create queue for consolidating notifications consolidateQueue = new Queue("notificationsConsolidate", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Worker to process jobs from the addQueue const addWorker = new Worker( "notificationsAdd", async (job) => { const { jobId, key, variables, recipients } = job.data; logger.logger.info(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const notification = { key, variables, timestamp: Date.now() }; // Store notification for each recipient in Redis for (const recipient of recipients) { const { user } = recipient; const userKey = `${redisKeyPrefix}:${user}`; const existingNotifications = await pubClient.get(userKey); const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); // Set with 40-second expiration to avoid stale data await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40); logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } const consolidateKey = `app:consolidate:${jobId}`; // setnx ensures only one consolidation job is scheduled (atomic operation) const flagSet = await pubClient.setnx(consolidateKey, "pending"); logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { // Schedule consolidation job to run after a 5-second delay await consolidateQueue.add( "consolidate-notifications", { jobId, recipients }, { jobId: `consolidate:${jobId}`, delay: 5000 } ); logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); // Set expiration on flag to clean up after 5 minutes await pubClient.expire(consolidateKey, 300); } else { logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); } }, { connection: pubClient, prefix: "{BULLMQ}", concurrency: 5 // Process up to 5 jobs concurrently } ); // Worker to process jobs from the consolidateQueue const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { const { jobId, recipients } = job.data; logger.logger.info(`Consolidating notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const lockKey = `lock:consolidate:${jobId}`; // Acquire a lock to prevent concurrent consolidation (NX = set if not exists) const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { try { const allNotifications = {}; // Get unique user IDs to avoid duplicate processing const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); // Retrieve and structure notifications by user and bodyShopId for (const user of uniqueUsers) { const userKey = `${redisKeyPrefix}:${user}`; const notifications = await pubClient.get(userKey); logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`); if (notifications) { const parsedNotifications = JSON.parse(notifications); const userRecipients = recipients.filter((r) => r.user === user); for (const { bodyShopId } of userRecipients) { allNotifications[user] = allNotifications[user] || {}; allNotifications[user][bodyShopId] = parsedNotifications; } await pubClient.del(userKey); // Clean up after retrieval logger.logger.debug(`Deleted Redis key ${userKey}`); } else { logger.logger.warn(`No notifications found for ${user} under ${userKey}`); } } logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); // Emit notifications to users via Socket.io for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userMapping = await redisHelpers.getUserSocketMapping(user); logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`); for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { if (userMapping && userMapping[bodyShopId]?.socketIds) { userMapping[bodyShopId].socketIds.forEach((socketId) => { logger.logger.debug( `Emitting to socket ${socketId}: ${JSON.stringify({ jobId, bodyShopId, notifications })}` ); ioRedis.to(socketId).emit("notification", { jobId, bodyShopId, notifications }); }); logger.logger.info( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId}` ); } else { logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); } } } // Clean up consolidation flag after processing await pubClient.del(`app:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); throw err; // Re-throw to trigger BullMQ's failed event } finally { await pubClient.del(lockKey); // Release lock regardless of success/failure } } else { logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } }, { connection: pubClient, prefix: "{BULLMQ}", concurrency: 1, // Single concurrency to avoid race conditions limiter: { max: 1, duration: 5000 } // Rate limit: 1 job every 5 seconds } ); // Log worker completion events addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`)); consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`)); // Log worker failure events with error details addWorker.on("failed", (job, err) => logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err }) ); consolidateWorker.on("failed", (job, err) => logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) ); // Graceful shutdown handler for workers const shutdown = async () => { logger.logger.info("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); logger.logger.info("App queue workers closed"); }; process.on("SIGTERM", shutdown); // Handle termination signal process.on("SIGINT", shutdown); // Handle interrupt signal (e.g., Ctrl+C) } return addQueue; // Return queue for external use }; /** * Retrieves the initialized `addQueue` instance. * * @returns {Queue} The `addQueue` instance for adding notifications. * @throws {Error} If `addQueue` is not initialized (i.e., `loadAppQueue` wasn’t called). */ const getQueue = () => { if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap."); return addQueue; }; /** * Dispatches notifications to the `addQueue` for processing. * * @param {Object} options - Options for dispatching notifications. * @param {Array} options.appsToDispatch - Array of notification objects to dispatch. * @param {Object} options.logger - Logger instance for logging dispatch events. * @returns {Promise} Resolves when all notifications are added to the queue. */ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { const appQueue = getQueue(); for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients } = app; // Unique jobId with timestamp to avoid duplicates await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients }, { jobId: `${jobId}:${Date.now()}` } ); logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };