const { Queue, Worker } = require("bullmq"); const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 const APP_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; return isNaN(parsedValue) ? 1 : Math.max(1, parsedValue); // Default to 1, ensure at least 1 })(); // Base time-related constant (in milliseconds) / DO NOT TOUCH const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout) // Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base) const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base) let addQueue; let consolidateQueue; // Updated GraphQL mutation to insert notifications with the new schema const INSERT_NOTIFICATIONS_MUTATION = ` mutation INSERT_NOTIFICATIONS($objects: [notifications_insert_input!]!) { insert_notifications(objects: $objects) { affected_rows returning { id jobid associationid scenario_text fcm_text scenario_meta } } } `; /** * Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications. * * @param {Array} notifications - Array of notification objects with 'body' and 'variables'. * @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'. */ const buildNotificationContent = (notifications) => { const scenarioText = notifications.map((n) => n.body); // Array of text entries const fcmText = scenarioText.join(". "); // Concatenated text with period separator const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects return { scenario_text: scenarioText, fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null scenario_meta: scenarioMeta }; }; /** * Initializes the notification queues and workers for adding and consolidating notifications. */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!addQueue || !consolidateQueue) { logger.logger.info("Initializing Notifications Queues"); addQueue = new Queue("notificationsAdd", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); consolidateQueue = new Queue("notificationsConsolidate", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); const addWorker = new Worker( "notificationsAdd", async (job) => { const { jobId, key, variables, recipients, body } = job.data; logger.logger.info(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const notification = { key, variables, body, timestamp: Date.now() }; for (const recipient of recipients) { const { user } = recipient; const userKey = `${redisKeyPrefix}:${user}`; const existingNotifications = await pubClient.get(userKey); const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } const consolidateKey = `app:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { await consolidateQueue.add( "consolidate-notifications", { jobId, recipients }, { jobId: `consolidate:${jobId}`, delay: APP_CONSOLIDATION_DELAY, attempts: 3, backoff: LOCK_EXPIRATION } ); logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); } }, { connection: pubClient, prefix: "{BULLMQ}", concurrency: 5 } ); const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { const { jobId, recipients } = job.data; logger.logger.info(`Consolidating notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const lockKey = `lock:consolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { try { const allNotifications = {}; const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); for (const user of uniqueUsers) { const userKey = `${redisKeyPrefix}:${user}`; const notifications = await pubClient.get(userKey); logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`); if (notifications) { const parsedNotifications = JSON.parse(notifications); const userRecipients = recipients.filter((r) => r.user === user); for (const { bodyShopId } of userRecipients) { allNotifications[user] = allNotifications[user] || {}; allNotifications[user][bodyShopId] = parsedNotifications; } await pubClient.del(userKey); logger.logger.debug(`Deleted Redis key ${userKey}`); } else { logger.logger.warn(`No notifications found for ${user} under ${userKey}`); } } logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); // Insert notifications into the database and collect IDs const notificationInserts = []; const notificationIdMap = new Map(); for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userRecipients = recipients.filter((r) => r.user === user); const employeeId = userRecipients[0]?.employeeId; for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); notificationInserts.push({ jobid: jobId, associationid: employeeId || null, scenario_text: JSON.stringify(scenario_text), // JSONB requires stringified input fcm_text: fcm_text, scenario_meta: JSON.stringify(scenario_meta) // JSONB requires stringified input }); notificationIdMap.set(`${user}:${bodyShopId}`, null); } } if (notificationInserts.length > 0) { const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: notificationInserts }); logger.logger.info( `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` ); insertResponse.insert_notifications.returning.forEach((row, index) => { const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)]; const bodyShopId = Object.keys(allNotifications[user])[ index % Object.keys(allNotifications[user]).length ]; notificationIdMap.set(`${user}:${bodyShopId}`, row.id); }); } // Emit notifications to users via Socket.io with notification ID for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userMapping = await redisHelpers.getUserSocketMapping(user); logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`); for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`); if (userMapping && userMapping[bodyShopId]?.socketIds) { userMapping[bodyShopId].socketIds.forEach((socketId) => { logger.logger.debug( `Emitting to socket ${socketId}: ${JSON.stringify({ jobId, bodyShopId, notifications, notificationId })}` ); ioRedis.to(socketId).emit("notification", { jobId, bodyShopId, notifications, notificationId }); }); logger.logger.info( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` ); } else { logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); } } } await pubClient.del(`app:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); throw err; } finally { await pubClient.del(lockKey); } } else { logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } }, { connection: pubClient, prefix: "{BULLMQ}", concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`)); consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`)); addWorker.on("failed", (job, err) => logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err }) ); consolidateWorker.on("failed", (job, err) => logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) ); const shutdown = async () => { logger.logger.info("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); logger.logger.info("App queue workers closed"); }; process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); } return addQueue; }; /** * Retrieves the initialized `addQueue` instance. */ const getQueue = () => { if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap."); return addQueue; }; /** * Dispatches notifications to the `addQueue` for processing. */ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { const appQueue = getQueue(); for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients, body } = app; await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients, body }, { jobId: `${jobId}:${Date.now()}` } ); logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };