From 4190372b92fe047b550bbfbfc68fd086225d6cd9 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 5 Jan 2026 12:46:09 -0500 Subject: [PATCH] feature/IO-3492-FCM-Queue-For-Notifications: Implement FCM queue and worker for notifications --- .../notification-settings-form.component.jsx | 33 +- server.js | 10 +- server/notifications/eventHandlers.js | 3 +- server/notifications/queues/appQueue.js | 323 +++++++++++------- server/notifications/queues/emailQueue.js | 12 +- server/notifications/queues/fcmQueue.js | 286 ++++++++++++++++ server/notifications/scenarioBuilders.js | 36 +- server/notifications/scenarioParser.js | 11 + 8 files changed, 558 insertions(+), 156 deletions(-) create mode 100644 server/notifications/queues/fcmQueue.js diff --git a/client/src/components/notification-settings/notification-settings-form.component.jsx b/client/src/components/notification-settings/notification-settings-form.component.jsx index c728943f8..6f29c9bb5 100644 --- a/client/src/components/notification-settings/notification-settings-form.component.jsx +++ b/client/src/components/notification-settings/notification-settings-form.component.jsx @@ -132,7 +132,7 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { dataIndex: "scenarioLabel", key: "scenario", render: (_, record) => t(`notifications.scenarios.${record.key}`), - width: "90%" + width: "80%" }, { title: setIsDirty(true)} />, @@ -155,19 +155,18 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { ) + }, + { + title: setIsDirty(true)} />, + dataIndex: "fcm", + key: "fcm", + align: "center", + render: (_, record) => ( + + + + ) } - // TODO: Disabled for now until FCM is implemented. - // { - // title: setIsDirty(true)} />, - // dataIndex: "fcm", - // key: "fcm", - // align: "center", - // render: (_, record) => ( - // - // - // - // ) - // } ]; const dataSource = notificationScenarios.map((scenario) => ({ key: scenario })); @@ -186,13 +185,7 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { extra={ {t("notifications.labels.auto-add")} - + diff --git a/server.js b/server.js index 4ae883717..ad829d67c 100644 --- a/server.js +++ b/server.js @@ -38,6 +38,7 @@ const { registerCleanupTask, initializeCleanupManager } = require("./server/util const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); const { loadAppQueue } = require("./server/notifications/queues/appQueue"); +const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue"); const CLUSTER_RETRY_BASE_DELAY = 100; const CLUSTER_RETRY_MAX_DELAY = 5000; @@ -355,9 +356,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const queueSettings = { pubClient, logger, redisHelpers, ioRedis }; // Assuming loadEmailQueue and loadAppQueue return Promises - const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([ + const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([ loadEmailQueue(queueSettings), - loadAppQueue(queueSettings) + loadAppQueue(queueSettings), + loadFcmQueue(queueSettings) ]); // Add error listeners or other setup for queues if needed @@ -368,6 +370,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { notificationsAppQueue.on("error", (error) => { logger.log(`Error in notificationsAppQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); + + notificationsFcmQueue.on("error", (error) => { + logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); + }); }; /** diff --git a/server/notifications/eventHandlers.js b/server/notifications/eventHandlers.js index 87a1dceed..178f41ae2 100644 --- a/server/notifications/eventHandlers.js +++ b/server/notifications/eventHandlers.js @@ -205,9 +205,8 @@ const handleTaskSocketEmit = (req) => { * @returns {Promise} JSON response with a success message. */ const handleTasksChange = async (req, res) => { - // Handle Notification Event - processNotificationEvent(req, res, "req.body.event.new.jobid", "Tasks Notifications Event Handled."); handleTaskSocketEmit(req); + return processNotificationEvent(req, res, "req.body.event.new.jobid", "Tasks Notifications Event Handled."); }; /** diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index b376b2cf6..9009ad8ed 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -42,6 +42,13 @@ const buildNotificationContent = (notifications) => { }; }; +/** + * Convert MS to S + * @param ms + * @returns {number} + */ +const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); + /** * Initializes the notification queues and workers for adding and consolidating notifications. */ @@ -52,6 +59,13 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`); + // Redis key helpers (per jobId) + const recipientsSetKey = (jobId) => `app:${devKey}:recipients:${jobId}`; // set of `${user}:${bodyShopId}` + const recipientAssocHashKey = (jobId) => `app:${devKey}:recipientAssoc:${jobId}`; // hash `${user}:${bodyShopId}` => associationId + const consolidateFlagKey = (jobId) => `app:${devKey}:consolidate:${jobId}`; + const lockKeyForJob = (jobId) => `lock:${devKey}:consolidate:${jobId}`; + const listKey = ({ jobId, user, bodyShopId }) => `app:${devKey}:notifications:${jobId}:${user}:${bodyShopId}`; + addQueue = new Queue("notificationsAdd", { prefix, connection: pubClient, @@ -70,27 +84,39 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; devDebugLogger(`Adding notifications for jobId ${jobId}`); - const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; - for (const recipient of recipients) { - const { user } = recipient; - const userKey = `${redisKeyPrefix}:${user}`; - const existingNotifications = await pubClient.get(userKey); - const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; - notifications.push(notification); - await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); - devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); + // Store notifications atomically (RPUSH) and store recipients in a Redis set + for (const recipient of recipients || []) { + const { user, bodyShopId, associationId } = recipient; + if (!user || !bodyShopId) continue; + + const rk = `${user}:${bodyShopId}`; + + // (1) Store notification payload in a list (atomic append) + const lk = listKey({ jobId, user, bodyShopId }); + await pubClient.rpush(lk, JSON.stringify(notification)); + await pubClient.expire(lk, seconds(NOTIFICATION_STORAGE_EXPIRATION)); + + // (2) Track recipients in a set, and associationId in a hash + await pubClient.sadd(recipientsSetKey(jobId), rk); + await pubClient.expire(recipientsSetKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION)); + + if (associationId) { + await pubClient.hset(recipientAssocHashKey(jobId), rk, String(associationId)); + } + await pubClient.expire(recipientAssocHashKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION)); } - const consolidateKey = `app:${devKey}:consolidate:${jobId}`; - const flagSet = await pubClient.setnx(consolidateKey, "pending"); + // Schedule consolidation once per jobId + const flagKey = consolidateFlagKey(jobId); + const flagSet = await pubClient.setnx(flagKey, "pending"); devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { await consolidateQueue.add( "consolidate-notifications", - { jobId, recipients }, + { jobId }, { jobId: `consolidate-${jobId}`, delay: APP_CONSOLIDATION_DELAY, @@ -98,8 +124,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { backoff: LOCK_EXPIRATION } ); + + await pubClient.expire(flagKey, seconds(CONSOLIDATION_FLAG_EXPIRATION)); devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); - await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } @@ -114,122 +141,163 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { - const { jobId, recipients } = job.data; + const { jobId } = job.data; devDebugLogger(`Consolidating notifications for jobId ${jobId}`); - const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; - const lockKey = `lock:${devKey}:consolidate:${jobId}`; - - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); + const lockKey = lockKeyForJob(jobId); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); - if (lockAcquired) { - try { - const allNotifications = {}; - const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; - devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`); - - for (const user of uniqueUsers) { - const userKey = `${redisKeyPrefix}:${user}`; - const notifications = await pubClient.get(userKey); - devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`); - - if (notifications) { - const parsedNotifications = JSON.parse(notifications); - const userRecipients = recipients.filter((r) => r.user === user); - for (const { bodyShopId } of userRecipients) { - allNotifications[user] = allNotifications[user] || {}; - allNotifications[user][bodyShopId] = parsedNotifications; - } - await pubClient.del(userKey); - devDebugLogger(`Deleted Redis key ${userKey}`); - } else { - devDebugLogger(`No notifications found for ${user} under ${userKey}`); - } - } - - devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); - - // Insert notifications into the database and collect IDs - const notificationInserts = []; - const notificationIdMap = new Map(); - - for (const [user, bodyShopData] of Object.entries(allNotifications)) { - const userRecipients = recipients.filter((r) => r.user === user); - const associationId = userRecipients[0]?.associationId; - - for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { - const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); - notificationInserts.push({ - jobid: jobId, - associationid: associationId, - scenario_text: JSON.stringify(scenario_text), - fcm_text: fcm_text, - scenario_meta: JSON.stringify(scenario_meta) - }); - notificationIdMap.set(`${user}:${bodyShopId}`, null); - } - } - - if (notificationInserts.length > 0) { - const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { - objects: notificationInserts - }); - devDebugLogger( - `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` - ); - - insertResponse.insert_notifications.returning.forEach((row, index) => { - const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)]; - const bodyShopId = Object.keys(allNotifications[user])[ - index % Object.keys(allNotifications[user]).length - ]; - notificationIdMap.set(`${user}:${bodyShopId}`, row.id); - }); - } - - // Emit notifications to users via Socket.io with notification ID - for (const [user, bodyShopData] of Object.entries(allNotifications)) { - const userMapping = await redisHelpers.getUserSocketMapping(user); - const userRecipients = recipients.filter((r) => r.user === user); - const associationId = userRecipients[0]?.associationId; - - for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { - const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`); - const jobRoNumber = notifications[0]?.jobRoNumber; - - if (userMapping && userMapping[bodyShopId]?.socketIds) { - userMapping[bodyShopId].socketIds.forEach((socketId) => { - ioRedis.to(socketId).emit("notification", { - jobId, - jobRoNumber, - bodyShopId, - notifications, - notificationId, - associationId - }); - }); - devDebugLogger( - `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` - ); - } else { - devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); - } - } - } - - await pubClient.del(`app:${devKey}:consolidate:${jobId}`); - } catch (err) { - logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", { - message: err?.message, - stack: err?.stack - }); - throw err; - } finally { - await pubClient.del(lockKey); - } - } else { + if (!lockAcquired) { devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); + return; + } + + try { + const rkSet = recipientsSetKey(jobId); + const assocHash = recipientAssocHashKey(jobId); + + const recipientKeys = await pubClient.smembers(rkSet); + if (!recipientKeys?.length) { + devDebugLogger(`No recipients found for jobId ${jobId}, nothing to consolidate.`); + await pubClient.del(consolidateFlagKey(jobId)); + return; + } + + const assocMap = await pubClient.hgetall(assocHash); + + // Collect notifications by recipientKey + const notificationsByRecipient = new Map(); // rk => parsed notifications array + + for (const rk of recipientKeys) { + const [user, bodyShopId] = rk.split(":"); + const lk = listKey({ jobId, user, bodyShopId }); + + const items = await pubClient.lrange(lk, 0, -1); + if (!items?.length) continue; + + const parsed = items + .map((x) => { + try { + return JSON.parse(x); + } catch { + return null; + } + }) + .filter(Boolean); + + if (parsed.length) { + notificationsByRecipient.set(rk, parsed); + } + + // Cleanup list key after reading + await pubClient.del(lk); + } + + if (!notificationsByRecipient.size) { + devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`); + await pubClient.del(rkSet); + await pubClient.del(assocHash); + await pubClient.del(consolidateFlagKey(jobId)); + return; + } + + // Build DB inserts + const inserts = []; + const insertMeta = []; // keep rk + associationId to emit after insert + + for (const [rk, notifications] of notificationsByRecipient.entries()) { + const associationId = assocMap?.[rk]; + + // If your DB requires associationid NOT NULL, skip if missing + if (!associationId) { + devDebugLogger(`Skipping insert for ${rk} (missing associationId).`); + continue; + } + + const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); + + inserts.push({ + jobid: jobId, + associationid: associationId, + // NOTE: if these are jsonb columns, remove JSON.stringify and pass arrays directly. + scenario_text: JSON.stringify(scenario_text), + fcm_text, + scenario_meta: JSON.stringify(scenario_meta) + }); + + insertMeta.push({ rk, associationId }); + } + + // Map notificationId by associationId from Hasura returning rows + const idByAssociationId = new Map(); + + if (inserts.length > 0) { + const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: inserts }); + + const returning = insertResponse?.insert_notifications?.returning || []; + returning.forEach((row) => { + // Expecting your mutation to return associationid as well as id. + // If your mutation currently doesn’t return associationid, update it. + if (row?.associationid) idByAssociationId.set(String(row.associationid), row.id); + }); + + devDebugLogger( + `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` + ); + } + + // Emit via Socket.io + // Group by user to reduce mapping lookups + const uniqueUsers = [...new Set(insertMeta.map(({ rk }) => rk.split(":")[0]))]; + + for (const user of uniqueUsers) { + const userMapping = await redisHelpers.getUserSocketMapping(user); + const entriesForUser = insertMeta + .map((m) => ({ ...m, user: m.rk.split(":")[0], bodyShopId: m.rk.split(":")[1] })) + .filter((m) => m.user === user); + + for (const entry of entriesForUser) { + const { rk, bodyShopId, associationId } = entry; + const notifications = notificationsByRecipient.get(rk) || []; + if (!notifications.length) continue; + + const jobRoNumber = notifications[0]?.jobRoNumber; + const notificationId = idByAssociationId.get(String(associationId)) || null; + + if (userMapping && userMapping[bodyShopId]?.socketIds) { + userMapping[bodyShopId].socketIds.forEach((socketId) => { + ioRedis.to(socketId).emit("notification", { + jobId, + jobRoNumber, + bodyShopId, + notifications, + notificationId, + associationId + }); + }); + + devDebugLogger( + `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} (notificationId ${notificationId})` + ); + } else { + devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); + } + } + } + + // Cleanup recipient tracking keys + consolidation flag + await pubClient.del(rkSet); + await pubClient.del(assocHash); + await pubClient.del(consolidateFlagKey(jobId)); + } catch (err) { + logger.log("app-queue-consolidation-error", "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }); + throw err; + } finally { + await pubClient.del(lockKey); } }, { @@ -244,13 +312,14 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`)); addWorker.on("failed", (job, err) => - logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", { + logger.log("app-queue-notification-error", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); + consolidateWorker.on("failed", (job, err) => - logger.log(`app-queue-consolidation-failed:`, "ERROR", "notifications", "api", { + logger.log("app-queue-consolidation-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) @@ -285,11 +354,13 @@ const dispatchAppsToQueue = async ({ appsToDispatch }) => { for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app; + await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber }, { jobId: `${jobId}-${Date.now()}` } ); + devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index a5ad8a530..76d368c9f 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -75,7 +75,9 @@ const loadEmailQueue = async ({ pubClient, logger }) => { await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone); await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); - await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user); + const recipientsSetKey = `email:${devKey}:recipients:${jobId}`; + await pubClient.sadd(recipientsSetKey, user); + await pubClient.expire(recipientsSetKey, NOTIFICATION_EXPIRATION / 1000); devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`); } @@ -239,7 +241,13 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { const emailAddQueue = getQueue(); for (const email of emailsToDispatch) { - const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = email; + const { jobId, bodyShopName, bodyShopTimezone, body, recipients } = email; + let { jobRoNumber } = email; + + // Make sure Jobs that have not been coverted yet can still get notifications + if (jobRoNumber === null) { + jobRoNumber = "N/A"; + } if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) { devDebugLogger( diff --git a/server/notifications/queues/fcmQueue.js b/server/notifications/queues/fcmQueue.js new file mode 100644 index 000000000..118676075 --- /dev/null +++ b/server/notifications/queues/fcmQueue.js @@ -0,0 +1,286 @@ +const { Queue, Worker } = require("bullmq"); +const { registerCleanupTask } = require("../../utils/cleanupManager"); +const getBullMQPrefix = require("../../utils/getBullMQPrefix"); +const devDebugLogger = require("../../utils/devDebugLogger"); + +const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => { + const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS; + const parsedValue = envValue ? parseInt(envValue, 10) : NaN; + return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); +})(); + +const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000; + +// pegged constants (pattern matches your other queues) +const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; +const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 0.25; +const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1; +const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; + +let fcmAddQueue; +let fcmConsolidateQueue; +let fcmAddWorker; +let fcmConsolidateWorker; + +// IMPORTANT: do NOT require firebase-handler at module load time. +// firebase-handler does `require(process.env.FIREBASE_ADMINSDK_JSON)` at top-level, +// which will hard-crash environments that don’t have Firebase configured. +const hasFirebaseEnv = () => Boolean(process.env.FIREBASE_ADMINSDK_JSON && process.env.FIREBASE_DATABASE_URL); + +/** + * Get the Firebase Admin SDK, or null if Firebase is not configured. + * @returns {{app: app, remoteConfig: ((app?: App) => remoteConfig.RemoteConfig) | remoteConfig, firestore: ((app?: App) => FirebaseFirestore.Firestore) | firestore, AppOptions: AppOptions, auth: ((app?: App) => auth.Auth) | auth, securityRules: ((app?: App) => securityRules.SecurityRules) | securityRules, installations: ((app?: App) => installations.Installations) | installations, FirebaseArrayIndexError: FirebaseArrayIndexError, storage: ((app?: App) => storage.Storage) | storage, appCheck: ((app?: App) => appCheck.AppCheck) | appCheck, initializeApp(options?: AppOptions, name?: string): app.App, FirebaseError: FirebaseError, messaging: ((app?: App) => messaging.Messaging) | messaging, projectManagement: ((app?: App) => projectManagement.ProjectManagement) | projectManagement, database: ((app?: App) => database.Database) | database, machineLearning: ((app?: App) => machineLearning.MachineLearning) | machineLearning, instanceId: ((app?: App) => instanceId.InstanceId) | instanceId, SDK_VERSION: string, apps: (app.App | null)[], credential: credential, ServiceAccount: ServiceAccount, GoogleOAuthAccessToken: GoogleOAuthAccessToken}|null} + */ +const getFirebaseAdmin = () => { + if (!hasFirebaseEnv()) return null; + const { admin } = require("../../firebase/firebase-handler"); + return admin; +}; + +/** + * Get the FCM topic name for an association. + * @param associationId + * @returns {`assoc-${string}-notifications`} + */ +const topicForAssociation = (associationId) => `assoc-${associationId}-notifications`; + +/** + * Build a summary string for FCM push notification body. + * @param count + * @param jobRoNumber + * @param bodyShopName + * @returns {`${string} ${string} for ${string|string}${string|string}`} + */ +const buildPushSummary = ({ count, jobRoNumber, bodyShopName }) => { + const updates = count === 1 ? "update" : "updates"; + const ro = jobRoNumber ? `RO ${jobRoNumber}` : "a job"; + const shop = bodyShopName ? ` at ${bodyShopName}` : ""; + return `${count} ${updates} for ${ro}${shop}`; +}; + +/** + * Loads the FCM notification queues and workers. + * @param pubClient + * @param logger + * @returns {Promise, ExtractResultType, ExtractNameType>|null>} + */ +const loadFcmQueue = async ({ pubClient, logger }) => { + if (!hasFirebaseEnv()) { + devDebugLogger("FCM queue not initialized (Firebase env not configured)."); + return null; + } + + if (!fcmAddQueue || !fcmConsolidateQueue) { + const prefix = getBullMQPrefix(); + const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; + + devDebugLogger(`Initializing FCM Queues with prefix: ${prefix}`); + + fcmAddQueue = new Queue("fcmAdd", { + prefix, + connection: pubClient, + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } + }); + + fcmConsolidateQueue = new Queue("fcmConsolidate", { + prefix, + connection: pubClient, + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } + }); + + fcmAddWorker = new Worker( + "fcmAdd", + async (job) => { + const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = + job.data; + devDebugLogger(`Adding FCM notifications for jobId ${jobId}`); + + const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; + + for (const r of recipients) { + const associationId = r?.associationId; + if (!associationId) continue; + + const assocKey = `${redisKeyPrefix}:${associationId}`; + const payload = JSON.stringify({ + body: body || "", + scenarioKey: scenarioKey || "", + key: key || "", + variables: variables || {}, + ts: Date.now() + }); + + await pubClient.rpush(assocKey, payload); + await pubClient.expire(assocKey, NOTIFICATION_EXPIRATION / 1000); + const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`; + await pubClient.sadd(recipientsSetKey, associationId); + await pubClient.expire(recipientsSetKey, NOTIFICATION_EXPIRATION / 1000); + + // store some metadata once per jobId + const metaKey = `fcm:${devKey}:meta:${jobId}`; + await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || ""); + await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || ""); + await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || ""); + await pubClient.expire(metaKey, NOTIFICATION_EXPIRATION / 1000); + } + + const consolidateKey = `fcm:${devKey}:consolidate:${jobId}`; + const flagSet = await pubClient.setnx(consolidateKey, "pending"); + + if (flagSet) { + await fcmConsolidateQueue.add( + "consolidate-fcm", + { jobId }, + { + jobId: `consolidate-${jobId}`, + delay: FCM_CONSOLIDATION_DELAY, + attempts: 3, + backoff: LOCK_EXPIRATION + } + ); + await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); + devDebugLogger(`Scheduled FCM consolidation for jobId ${jobId}`); + } else { + devDebugLogger(`FCM consolidation already scheduled for jobId ${jobId}`); + } + }, + { prefix, connection: pubClient, concurrency: 5 } + ); + + fcmConsolidateWorker = new Worker( + "fcmConsolidate", + async (job) => { + const { jobId } = job.data; + const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; + + const lockKey = `lock:${devKey}:fcmConsolidate:${jobId}`; + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); + + if (!lockAcquired) { + devDebugLogger(`Skipped FCM consolidation for jobId ${jobId} - lock held by another worker`); + return; + } + + try { + const admin = getFirebaseAdmin(); + if (!admin) { + devDebugLogger("FCM consolidation skipped (Firebase not available)."); + return; + } + + const recipientsSet = `fcm:${devKey}:recipients:${jobId}`; + const associationIds = await pubClient.smembers(recipientsSet); + + const metaKey = `fcm:${devKey}:meta:${jobId}`; + const meta = await pubClient.hgetall(metaKey); + const jobRoNumber = meta?.jobRoNumber || ""; + const bodyShopId = meta?.bodyShopId || ""; + const bodyShopName = meta?.bodyShopName || ""; + + for (const associationId of associationIds) { + const assocKey = `fcm:${devKey}:notifications:${jobId}:${associationId}`; + const messages = await pubClient.lrange(assocKey, 0, -1); + + if (!messages?.length) continue; + + const count = messages.length; + const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName }); + + const topic = topicForAssociation(associationId); + + // FCM "data" values MUST be strings + await admin.messaging().send({ + topic, + notification: { + title: "ImEX Online", + body: notificationBody + }, + data: { + type: "job-notification", + jobId: String(jobId), + jobRoNumber: String(jobRoNumber || ""), + bodyShopId: String(bodyShopId || ""), + bodyShopName: String(bodyShopName || ""), + associationId: String(associationId), + count: String(count) + }, + android: { priority: "high" }, + apns: { headers: { "apns-priority": "10" } } + }); + + devDebugLogger(`Sent FCM push to topic ${topic} for jobId ${jobId} (${count} updates)`); + + await pubClient.del(assocKey); + } + + await pubClient.del(recipientsSet); + await pubClient.del(metaKey); + await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`); + } catch (err) { + logger.log("fcm-queue-consolidation-error", "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }); + throw err; + } finally { + await pubClient.del(lockKey); + } + }, + { prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } + ); + + fcmAddWorker.on("failed", (job, err) => + logger.log("fcm-add-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) + ); + + fcmConsolidateWorker.on("failed", (job, err) => + logger.log("fcm-consolidate-failed", "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }) + ); + + const shutdown = async () => { + devDebugLogger("Closing FCM queue workers..."); + await Promise.all([fcmAddWorker.close(), fcmConsolidateWorker.close()]); + devDebugLogger("FCM queue workers closed"); + }; + + registerCleanupTask(shutdown); + } + + return fcmAddQueue; +}; + +/** + * Get the FCM add queue. + * @returns {*} + */ +const getQueue = () => { + if (!fcmAddQueue) throw new Error("FCM add queue not initialized. Ensure loadFcmQueue is called during bootstrap."); + return fcmAddQueue; +}; + +/** + * Dispatch FCM notifications to the FCM add queue. + * @param fcmsToDispatch + * @returns {Promise} + */ +const dispatchFcmsToQueue = async ({ fcmsToDispatch }) => { + if (!hasFirebaseEnv()) return; + const queue = getQueue(); + + for (const fcm of fcmsToDispatch) { + const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = fcm; + + if (!jobId || !recipients?.length) continue; + + await queue.add( + "add-fcm-notification", + { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients }, + { jobId: `${jobId}-${Date.now()}` } + ); + } +}; + +module.exports = { loadFcmQueue, getQueue, dispatchFcmsToQueue }; diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js index ec24e1804..d94c5390b 100644 --- a/server/notifications/scenarioBuilders.js +++ b/server/notifications/scenarioBuilders.js @@ -19,6 +19,8 @@ const buildNotification = (data, key, body, variables = {}) => { jobId: data.jobId, jobRoNumber: data.jobRoNumber, bodyShopId: data.bodyShopId, + scenarioKey: data.scenarioKey, + scenarioTable: data.scenarioTable, key, body, variables, @@ -32,21 +34,47 @@ const buildNotification = (data, key, body, variables = {}) => { body, recipients: [] }, - fcm: { recipients: [] } + fcm: { + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopId: data.bodyShopId, + bodyShopName: data.bodyShopName, + bodyShopTimezone: data.bodyShopTimezone, + scenarioKey: data.scenarioKey, + scenarioTable: data.scenarioTable, + key, + body, + variables, + recipients: [] + } }; // Populate recipients from scenarioWatchers data.scenarioWatchers.forEach((recipients) => { const { user, app, fcm, email, firstName, lastName, employeeId, associationId } = recipients; - if (app === true) + + if (app === true) { result.app.recipients.push({ user, bodyShopId: data.bodyShopId, employeeId, associationId }); - if (fcm === true) result.fcm.recipients.push(user); - if (email === true) result.email.recipients.push({ user, firstName, lastName }); + } + + if (email === true) { + result.email.recipients.push({ user, firstName, lastName }); + } + + if (fcm === true) { + // Keep structure consistent and future-proof (token lookup is done server-side) + result.fcm.recipients.push({ + user, + bodyShopId: data.bodyShopId, + employeeId, + associationId + }); + } }); return result; diff --git a/server/notifications/scenarioParser.js b/server/notifications/scenarioParser.js index aebec8205..7e6a211ad 100644 --- a/server/notifications/scenarioParser.js +++ b/server/notifications/scenarioParser.js @@ -14,6 +14,7 @@ const { isEmpty, isFunction } = require("lodash"); const { getMatchingScenarios } = require("./scenarioMapper"); const { dispatchEmailsToQueue } = require("./queues/emailQueue"); const { dispatchAppsToQueue } = require("./queues/appQueue"); +const { dispatchFcmsToQueue } = require("./queues/fcmQueue"); // NEW // If true, the user who commits the action will NOT receive notifications; if false, they will. const FILTER_SELF_FROM_WATCHERS = process.env?.FILTER_SELF_FROM_WATCHERS !== "false"; @@ -298,6 +299,16 @@ const scenarioParser = async (req, jobIdField) => { }) ); } + + const fcmsToDispatch = scenariosToDispatch.map((scenario) => scenario?.fcm); + if (!isEmpty(fcmsToDispatch)) { + dispatchFcmsToQueue({ fcmsToDispatch, logger }).catch((e) => + logger.log("Something went wrong dispatching FCMs to the FCM Notification Queue", "error", "queue", null, { + message: e?.message, + stack: e?.stack + }) + ); + } }; module.exports = scenarioParser;