const { Queue, Worker } = require("bullmq"); const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 const APP_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1 })(); // Base time-related constant (in milliseconds) / DO NOT TOUCH const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout) // Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base) const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base) let addQueue; let consolidateQueue; /** * Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications. * * @param {Array} notifications - Array of notification objects with 'body' and 'variables'. * @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'. */ const buildNotificationContent = (notifications) => { const scenarioText = notifications.map((n) => n.body); // Array of text entries const fcmText = scenarioText.join(". "); // Concatenated text with period separator const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects return { scenario_text: scenarioText, fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null scenario_meta: scenarioMeta }; }; /** * Convert MS to S * @param ms * @returns {number} */ const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); /** * Initializes the notification queues and workers for adding and consolidating notifications. */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!addQueue || !consolidateQueue) { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`); // Redis key helpers (per jobId) const recipientsSetKey = (jobId) => `app:${devKey}:recipients:${jobId}`; // set of `${user}:${bodyShopId}` const recipientAssocHashKey = (jobId) => `app:${devKey}:recipientAssoc:${jobId}`; // hash `${user}:${bodyShopId}` => associationId const consolidateFlagKey = (jobId) => `app:${devKey}:consolidate:${jobId}`; const lockKeyForJob = (jobId) => `lock:${devKey}:consolidate:${jobId}`; const listKey = ({ jobId, user, bodyShopId }) => `app:${devKey}:notifications:${jobId}:${user}:${bodyShopId}`; addQueue = new Queue("notificationsAdd", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); consolidateQueue = new Queue("notificationsConsolidate", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); const addWorker = new Worker( "notificationsAdd", async (job) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; devDebugLogger(`Adding notifications for jobId ${jobId}`); const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; // Store notifications atomically (RPUSH) and store recipients in a Redis set for (const recipient of recipients || []) { const { user, bodyShopId, associationId } = recipient; if (!user || !bodyShopId) continue; const rk = `${user}:${bodyShopId}`; // (1) Store notification payload in a list (atomic append) const lk = listKey({ jobId, user, bodyShopId }); await pubClient.rpush(lk, JSON.stringify(notification)); await pubClient.expire(lk, seconds(NOTIFICATION_STORAGE_EXPIRATION)); // (2) Track recipients in a set, and associationId in a hash await pubClient.sadd(recipientsSetKey(jobId), rk); await pubClient.expire(recipientsSetKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION)); if (associationId) { await pubClient.hset(recipientAssocHashKey(jobId), rk, String(associationId)); } await pubClient.expire(recipientAssocHashKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION)); } // Schedule consolidation once per jobId const flagKey = consolidateFlagKey(jobId); const flagSet = await pubClient.setnx(flagKey, "pending"); devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { await consolidateQueue.add( "consolidate-notifications", { jobId }, { jobId: `consolidate-${jobId}`, delay: APP_CONSOLIDATION_DELAY, attempts: 3, backoff: LOCK_EXPIRATION } ); await pubClient.expire(flagKey, seconds(CONSOLIDATION_FLAG_EXPIRATION)); devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); } else { devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } }, { prefix, connection: pubClient, concurrency: 5 } ); const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { const { jobId } = job.data; devDebugLogger(`Consolidating notifications for jobId ${jobId}`); const lockKey = lockKeyForJob(jobId); const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (!lockAcquired) { devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); return; } try { const rkSet = recipientsSetKey(jobId); const assocHash = recipientAssocHashKey(jobId); const recipientKeys = await pubClient.smembers(rkSet); if (!recipientKeys?.length) { devDebugLogger(`No recipients found for jobId ${jobId}, nothing to consolidate.`); await pubClient.del(consolidateFlagKey(jobId)); return; } const assocMap = await pubClient.hgetall(assocHash); // Collect notifications by recipientKey const notificationsByRecipient = new Map(); // rk => parsed notifications array const listKeysToDelete = []; // delete only after successful insert+emit for (const rk of recipientKeys) { const [user, bodyShopId] = rk.split(":"); const lk = listKey({ jobId, user, bodyShopId }); const items = await pubClient.lrange(lk, 0, -1); if (!items?.length) continue; const parsed = items .map((x) => { try { return JSON.parse(x); } catch { return null; } }) .filter(Boolean); if (parsed.length) { notificationsByRecipient.set(rk, parsed); // IMPORTANT: do NOT delete list yet; only delete after successful insert+emit listKeysToDelete.push(lk); } } if (!notificationsByRecipient.size) { devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`); if (listKeysToDelete.length) { await pubClient.del(...listKeysToDelete); } await pubClient.del(rkSet); await pubClient.del(assocHash); await pubClient.del(consolidateFlagKey(jobId)); return; } // Build DB inserts const inserts = []; const insertMeta = []; // keep rk + associationId to emit after insert for (const [rk, notifications] of notificationsByRecipient.entries()) { const associationId = assocMap?.[rk]; // If your DB requires associationid NOT NULL, skip if missing if (!associationId) { devDebugLogger(`Skipping insert for ${rk} (missing associationId).`); continue; } const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); inserts.push({ jobid: jobId, associationid: associationId, // NOTE: if these are jsonb columns, remove JSON.stringify and pass arrays directly. scenario_text: JSON.stringify(scenario_text), fcm_text, scenario_meta: JSON.stringify(scenario_meta) }); insertMeta.push({ rk, associationId }); } // Map notificationId by associationId from Hasura returning rows const idByAssociationId = new Map(); if (inserts.length > 0) { const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: inserts }); const returning = insertResponse?.insert_notifications?.returning || []; returning.forEach((row) => { // Expecting your mutation to return associationid as well as id. // If your mutation currently doesn’t return associationid, update it. if (row?.associationid) idByAssociationId.set(String(row.associationid), row.id); }); devDebugLogger( `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` ); } // Emit via Socket.io // Group by user to reduce mapping lookups const uniqueUsers = [...new Set(insertMeta.map(({ rk }) => rk.split(":")[0]))]; for (const user of uniqueUsers) { const userMapping = await redisHelpers.getUserSocketMapping(user); const entriesForUser = insertMeta .map((m) => ({ ...m, user: m.rk.split(":")[0], bodyShopId: m.rk.split(":")[1] })) .filter((m) => m.user === user); for (const entry of entriesForUser) { const { rk, bodyShopId, associationId } = entry; const notifications = notificationsByRecipient.get(rk) || []; if (!notifications.length) continue; const jobRoNumber = notifications[0]?.jobRoNumber; const notificationId = idByAssociationId.get(String(associationId)) || null; if (userMapping && userMapping[bodyShopId]?.socketIds) { userMapping[bodyShopId].socketIds.forEach((socketId) => { ioRedis.to(socketId).emit("notification", { jobId, jobRoNumber, bodyShopId, notifications, notificationId, associationId }); }); devDebugLogger( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} (notificationId ${notificationId})` ); } else { devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); } } } // Cleanup recipient tracking keys + consolidation flag await pubClient.del(rkSet); await pubClient.del(assocHash); await pubClient.del(consolidateFlagKey(jobId)); } catch (err) { logger.log("app-queue-consolidation-error", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }); throw err; } finally { await pubClient.del(lockKey); } }, { prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`)); consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`)); addWorker.on("failed", (job, err) => logger.log("app-queue-notification-error", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); consolidateWorker.on("failed", (job, err) => logger.log("app-queue-consolidation-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); // Register cleanup task instead of direct process listeners const shutdown = async () => { devDebugLogger("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); devDebugLogger("App queue workers closed"); }; registerCleanupTask(shutdown); } return addQueue; }; /** * Retrieves the initialized `addQueue` instance. */ const getQueue = () => { if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap."); return addQueue; }; /** * Dispatches notifications to the `addQueue` for processing. */ const dispatchAppsToQueue = async ({ appsToDispatch }) => { const appQueue = getQueue(); for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app; await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber }, { jobId: `${jobId}-${Date.now()}` } ); devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };