const { Queue, Worker } = require("bullmq"); const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 const APP_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1 })(); // Base time-related constant (in milliseconds) / DO NOT TOUCH const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout) // Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base) const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base) let addQueue; let consolidateQueue; /** * Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications. * * @param {Array} notifications - Array of notification objects with 'body' and 'variables'. * @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'. */ const buildNotificationContent = (notifications) => { const scenarioText = notifications.map((n) => n.body); // Array of text entries const fcmText = scenarioText.join(". "); // Concatenated text with period separator const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects return { scenario_text: scenarioText, fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null scenario_meta: scenarioMeta }; }; /** * Initializes the notification queues and workers for adding and consolidating notifications. */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!addQueue || !consolidateQueue) { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`); addQueue = new Queue("notificationsAdd", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); consolidateQueue = new Queue("notificationsConsolidate", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); const addWorker = new Worker( "notificationsAdd", async (job) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; devDebugLogger(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; for (const recipient of recipients) { const { user } = recipient; const userKey = `${redisKeyPrefix}:${user}`; const existingNotifications = await pubClient.get(userKey); const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } const consolidateKey = `app:${devKey}:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { await consolidateQueue.add( "consolidate-notifications", { jobId, recipients }, { jobId: `consolidate:${jobId}`, delay: APP_CONSOLIDATION_DELAY, attempts: 3, backoff: LOCK_EXPIRATION } ); devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } }, { prefix, connection: pubClient, concurrency: 5 } ); const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { const { jobId, recipients } = job.data; devDebugLogger(`Consolidating notifications for jobId ${jobId}`); const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const lockKey = `lock:${devKey}:consolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { try { const allNotifications = {}; const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`); for (const user of uniqueUsers) { const userKey = `${redisKeyPrefix}:${user}`; const notifications = await pubClient.get(userKey); devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`); if (notifications) { const parsedNotifications = JSON.parse(notifications); const userRecipients = recipients.filter((r) => r.user === user); for (const { bodyShopId } of userRecipients) { allNotifications[user] = allNotifications[user] || {}; allNotifications[user][bodyShopId] = parsedNotifications; } await pubClient.del(userKey); devDebugLogger(`Deleted Redis key ${userKey}`); } else { devDebugLogger(`No notifications found for ${user} under ${userKey}`); } } devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); // Insert notifications into the database and collect IDs const notificationInserts = []; const notificationIdMap = new Map(); for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userRecipients = recipients.filter((r) => r.user === user); const associationId = userRecipients[0]?.associationId; for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); notificationInserts.push({ jobid: jobId, associationid: associationId, scenario_text: JSON.stringify(scenario_text), fcm_text: fcm_text, scenario_meta: JSON.stringify(scenario_meta) }); notificationIdMap.set(`${user}:${bodyShopId}`, null); } } if (notificationInserts.length > 0) { const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: notificationInserts }); devDebugLogger( `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` ); insertResponse.insert_notifications.returning.forEach((row, index) => { const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)]; const bodyShopId = Object.keys(allNotifications[user])[ index % Object.keys(allNotifications[user]).length ]; notificationIdMap.set(`${user}:${bodyShopId}`, row.id); }); } // Emit notifications to users via Socket.io with notification ID for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userMapping = await redisHelpers.getUserSocketMapping(user); const userRecipients = recipients.filter((r) => r.user === user); const associationId = userRecipients[0]?.associationId; for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`); const jobRoNumber = notifications[0]?.jobRoNumber; if (userMapping && userMapping[bodyShopId]?.socketIds) { userMapping[bodyShopId].socketIds.forEach((socketId) => { ioRedis.to(socketId).emit("notification", { jobId, jobRoNumber, bodyShopId, notifications, notificationId, associationId }); }); devDebugLogger( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` ); } else { devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); } } } await pubClient.del(`app:${devKey}:consolidate:${jobId}`); } catch (err) { logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }); throw err; } finally { await pubClient.del(lockKey); } } else { devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } }, { prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`)); consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`)); addWorker.on("failed", (job, err) => logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); consolidateWorker.on("failed", (job, err) => logger.log(`app-queue-consolidation-failed:`, "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); // Register cleanup task instead of direct process listeners const shutdown = async () => { devDebugLogger("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); devDebugLogger("App queue workers closed"); }; registerCleanupTask(shutdown); } return addQueue; }; /** * Retrieves the initialized `addQueue` instance. */ const getQueue = () => { if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap."); return addQueue; }; /** * Dispatches notifications to the `addQueue` for processing. */ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { const appQueue = getQueue(); for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app; await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber }, { jobId: `${jobId}:${Date.now()}` } ); devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };