diff --git a/client/src/components/notification-settings/notification-settings-form.component.jsx b/client/src/components/notification-settings/notification-settings-form.component.jsx index c728943f8..649a0be55 100644 --- a/client/src/components/notification-settings/notification-settings-form.component.jsx +++ b/client/src/components/notification-settings/notification-settings-form.component.jsx @@ -132,7 +132,7 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { dataIndex: "scenarioLabel", key: "scenario", render: (_, record) => t(`notifications.scenarios.${record.key}`), - width: "90%" + width: "80%" }, { title: setIsDirty(true)} />, @@ -156,20 +156,23 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { ) } - // TODO: Disabled for now until FCM is implemented. - // { - // title: setIsDirty(true)} />, - // dataIndex: "fcm", - // key: "fcm", - // align: "center", - // render: (_, record) => ( - // - // - // - // ) - // } ]; + // Currently disabled for prod + if (!import.meta.env.PROD) { + columns.push({ + title: setIsDirty(true)} />, + dataIndex: "fcm", + key: "fcm", + align: "center", + render: (_, record) => ( + + + + ) + }); + } + const dataSource = notificationScenarios.map((scenario) => ({ key: scenario })); return ( @@ -186,13 +189,7 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { extra={ {t("notifications.labels.auto-add")} - + diff --git a/server.js b/server.js index 4ae883717..ad829d67c 100644 --- a/server.js +++ b/server.js @@ -38,6 +38,7 @@ const { registerCleanupTask, initializeCleanupManager } = require("./server/util const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); const { loadAppQueue } = require("./server/notifications/queues/appQueue"); +const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue"); const CLUSTER_RETRY_BASE_DELAY = 100; const CLUSTER_RETRY_MAX_DELAY = 5000; @@ -355,9 +356,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const queueSettings = { pubClient, logger, redisHelpers, ioRedis }; // Assuming loadEmailQueue and loadAppQueue return Promises - const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([ + const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([ loadEmailQueue(queueSettings), - loadAppQueue(queueSettings) + loadAppQueue(queueSettings), + loadFcmQueue(queueSettings) ]); // Add error listeners or other setup for queues if needed @@ -368,6 +370,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { notificationsAppQueue.on("error", (error) => { logger.log(`Error in notificationsAppQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); + + notificationsFcmQueue.on("error", (error) => { + logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); + }); }; /** diff --git a/server/graphql-client/queries.js b/server/graphql-client/queries.js index 4ed6c311b..21ec2a374 100644 --- a/server/graphql-client/queries.js +++ b/server/graphql-client/queries.js @@ -3190,3 +3190,20 @@ mutation INSERT_MEDIA_ANALYTICS($mediaObject: media_analytics_insert_input!) { } } `; + +exports.GET_USERS_FCM_TOKENS_BY_EMAILS = /* GraphQL */ ` + query GET_USERS_FCM_TOKENS_BY_EMAILS($emails: [String!]!) { + users(where: { email: { _in: $emails } }) { + email + fcmtokens + } + } +`; + +exports.UPDATE_USER_FCM_TOKENS_BY_EMAIL = /* GraphQL */ ` + mutation UPDATE_USER_FCM_TOKENS_BY_EMAIL($email: String!, $fcmtokens: jsonb) { + update_users(where: { email: { _eq: $email } }, _set: { fcmtokens: $fcmtokens }) { + affected_rows + } + } +`; diff --git a/server/notifications/eventHandlers.js b/server/notifications/eventHandlers.js index 87a1dceed..178f41ae2 100644 --- a/server/notifications/eventHandlers.js +++ b/server/notifications/eventHandlers.js @@ -205,9 +205,8 @@ const handleTaskSocketEmit = (req) => { * @returns {Promise} JSON response with a success message. */ const handleTasksChange = async (req, res) => { - // Handle Notification Event - processNotificationEvent(req, res, "req.body.event.new.jobid", "Tasks Notifications Event Handled."); handleTaskSocketEmit(req); + return processNotificationEvent(req, res, "req.body.event.new.jobid", "Tasks Notifications Event Handled."); }; /** diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index b376b2cf6..4cd7777d6 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -42,6 +42,13 @@ const buildNotificationContent = (notifications) => { }; }; +/** + * Convert MS to S + * @param ms + * @returns {number} + */ +const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); + /** * Initializes the notification queues and workers for adding and consolidating notifications. */ @@ -52,6 +59,13 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`); + // Redis key helpers (per jobId) + const recipientsSetKey = (jobId) => `app:${devKey}:recipients:${jobId}`; // set of `${user}:${bodyShopId}` + const recipientAssocHashKey = (jobId) => `app:${devKey}:recipientAssoc:${jobId}`; // hash `${user}:${bodyShopId}` => associationId + const consolidateFlagKey = (jobId) => `app:${devKey}:consolidate:${jobId}`; + const lockKeyForJob = (jobId) => `lock:${devKey}:consolidate:${jobId}`; + const listKey = ({ jobId, user, bodyShopId }) => `app:${devKey}:notifications:${jobId}:${user}:${bodyShopId}`; + addQueue = new Queue("notificationsAdd", { prefix, connection: pubClient, @@ -70,27 +84,39 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; devDebugLogger(`Adding notifications for jobId ${jobId}`); - const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; - for (const recipient of recipients) { - const { user } = recipient; - const userKey = `${redisKeyPrefix}:${user}`; - const existingNotifications = await pubClient.get(userKey); - const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; - notifications.push(notification); - await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); - devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); + // Store notifications atomically (RPUSH) and store recipients in a Redis set + for (const recipient of recipients || []) { + const { user, bodyShopId, associationId } = recipient; + if (!user || !bodyShopId) continue; + + const rk = `${user}:${bodyShopId}`; + + // (1) Store notification payload in a list (atomic append) + const lk = listKey({ jobId, user, bodyShopId }); + await pubClient.rpush(lk, JSON.stringify(notification)); + await pubClient.expire(lk, seconds(NOTIFICATION_STORAGE_EXPIRATION)); + + // (2) Track recipients in a set, and associationId in a hash + await pubClient.sadd(recipientsSetKey(jobId), rk); + await pubClient.expire(recipientsSetKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION)); + + if (associationId) { + await pubClient.hset(recipientAssocHashKey(jobId), rk, String(associationId)); + } + await pubClient.expire(recipientAssocHashKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION)); } - const consolidateKey = `app:${devKey}:consolidate:${jobId}`; - const flagSet = await pubClient.setnx(consolidateKey, "pending"); + // Schedule consolidation once per jobId + const flagKey = consolidateFlagKey(jobId); + const flagSet = await pubClient.setnx(flagKey, "pending"); devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { await consolidateQueue.add( "consolidate-notifications", - { jobId, recipients }, + { jobId }, { jobId: `consolidate-${jobId}`, delay: APP_CONSOLIDATION_DELAY, @@ -98,8 +124,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { backoff: LOCK_EXPIRATION } ); + + await pubClient.expire(flagKey, seconds(CONSOLIDATION_FLAG_EXPIRATION)); devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); - await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } @@ -114,122 +141,167 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { - const { jobId, recipients } = job.data; + const { jobId } = job.data; devDebugLogger(`Consolidating notifications for jobId ${jobId}`); - const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; - const lockKey = `lock:${devKey}:consolidate:${jobId}`; - - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); + const lockKey = lockKeyForJob(jobId); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); - if (lockAcquired) { - try { - const allNotifications = {}; - const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; - devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`); - - for (const user of uniqueUsers) { - const userKey = `${redisKeyPrefix}:${user}`; - const notifications = await pubClient.get(userKey); - devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`); - - if (notifications) { - const parsedNotifications = JSON.parse(notifications); - const userRecipients = recipients.filter((r) => r.user === user); - for (const { bodyShopId } of userRecipients) { - allNotifications[user] = allNotifications[user] || {}; - allNotifications[user][bodyShopId] = parsedNotifications; - } - await pubClient.del(userKey); - devDebugLogger(`Deleted Redis key ${userKey}`); - } else { - devDebugLogger(`No notifications found for ${user} under ${userKey}`); - } - } - - devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); - - // Insert notifications into the database and collect IDs - const notificationInserts = []; - const notificationIdMap = new Map(); - - for (const [user, bodyShopData] of Object.entries(allNotifications)) { - const userRecipients = recipients.filter((r) => r.user === user); - const associationId = userRecipients[0]?.associationId; - - for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { - const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); - notificationInserts.push({ - jobid: jobId, - associationid: associationId, - scenario_text: JSON.stringify(scenario_text), - fcm_text: fcm_text, - scenario_meta: JSON.stringify(scenario_meta) - }); - notificationIdMap.set(`${user}:${bodyShopId}`, null); - } - } - - if (notificationInserts.length > 0) { - const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { - objects: notificationInserts - }); - devDebugLogger( - `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` - ); - - insertResponse.insert_notifications.returning.forEach((row, index) => { - const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)]; - const bodyShopId = Object.keys(allNotifications[user])[ - index % Object.keys(allNotifications[user]).length - ]; - notificationIdMap.set(`${user}:${bodyShopId}`, row.id); - }); - } - - // Emit notifications to users via Socket.io with notification ID - for (const [user, bodyShopData] of Object.entries(allNotifications)) { - const userMapping = await redisHelpers.getUserSocketMapping(user); - const userRecipients = recipients.filter((r) => r.user === user); - const associationId = userRecipients[0]?.associationId; - - for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { - const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`); - const jobRoNumber = notifications[0]?.jobRoNumber; - - if (userMapping && userMapping[bodyShopId]?.socketIds) { - userMapping[bodyShopId].socketIds.forEach((socketId) => { - ioRedis.to(socketId).emit("notification", { - jobId, - jobRoNumber, - bodyShopId, - notifications, - notificationId, - associationId - }); - }); - devDebugLogger( - `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` - ); - } else { - devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); - } - } - } - - await pubClient.del(`app:${devKey}:consolidate:${jobId}`); - } catch (err) { - logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", { - message: err?.message, - stack: err?.stack - }); - throw err; - } finally { - await pubClient.del(lockKey); - } - } else { + if (!lockAcquired) { devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); + return; + } + + try { + const rkSet = recipientsSetKey(jobId); + const assocHash = recipientAssocHashKey(jobId); + + const recipientKeys = await pubClient.smembers(rkSet); + if (!recipientKeys?.length) { + devDebugLogger(`No recipients found for jobId ${jobId}, nothing to consolidate.`); + await pubClient.del(consolidateFlagKey(jobId)); + return; + } + + const assocMap = await pubClient.hgetall(assocHash); + + // Collect notifications by recipientKey + const notificationsByRecipient = new Map(); // rk => parsed notifications array + const listKeysToDelete = []; // delete only after successful insert+emit + + for (const rk of recipientKeys) { + const [user, bodyShopId] = rk.split(":"); + const lk = listKey({ jobId, user, bodyShopId }); + + const items = await pubClient.lrange(lk, 0, -1); + if (!items?.length) continue; + + const parsed = items + .map((x) => { + try { + return JSON.parse(x); + } catch { + return null; + } + }) + .filter(Boolean); + + if (parsed.length) { + notificationsByRecipient.set(rk, parsed); + + // IMPORTANT: do NOT delete list yet; only delete after successful insert+emit + listKeysToDelete.push(lk); + } + } + + if (!notificationsByRecipient.size) { + devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`); + if (listKeysToDelete.length) { + await pubClient.del(...listKeysToDelete); + } + await pubClient.del(rkSet); + await pubClient.del(assocHash); + await pubClient.del(consolidateFlagKey(jobId)); + return; + } + + // Build DB inserts + const inserts = []; + const insertMeta = []; // keep rk + associationId to emit after insert + + for (const [rk, notifications] of notificationsByRecipient.entries()) { + const associationId = assocMap?.[rk]; + + // If your DB requires associationid NOT NULL, skip if missing + if (!associationId) { + devDebugLogger(`Skipping insert for ${rk} (missing associationId).`); + continue; + } + + const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications); + + inserts.push({ + jobid: jobId, + associationid: associationId, + // NOTE: if these are jsonb columns, remove JSON.stringify and pass arrays directly. + scenario_text: JSON.stringify(scenario_text), + fcm_text, + scenario_meta: JSON.stringify(scenario_meta) + }); + + insertMeta.push({ rk, associationId }); + } + + // Map notificationId by associationId from Hasura returning rows + const idByAssociationId = new Map(); + + if (inserts.length > 0) { + const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: inserts }); + + const returning = insertResponse?.insert_notifications?.returning || []; + returning.forEach((row) => { + // Expecting your mutation to return associationid as well as id. + // If your mutation currently doesn’t return associationid, update it. + if (row?.associationid) idByAssociationId.set(String(row.associationid), row.id); + }); + + devDebugLogger( + `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` + ); + } + + // Emit via Socket.io + // Group by user to reduce mapping lookups + const uniqueUsers = [...new Set(insertMeta.map(({ rk }) => rk.split(":")[0]))]; + + for (const user of uniqueUsers) { + const userMapping = await redisHelpers.getUserSocketMapping(user); + const entriesForUser = insertMeta + .map((m) => ({ ...m, user: m.rk.split(":")[0], bodyShopId: m.rk.split(":")[1] })) + .filter((m) => m.user === user); + + for (const entry of entriesForUser) { + const { rk, bodyShopId, associationId } = entry; + const notifications = notificationsByRecipient.get(rk) || []; + if (!notifications.length) continue; + + const jobRoNumber = notifications[0]?.jobRoNumber; + const notificationId = idByAssociationId.get(String(associationId)) || null; + + if (userMapping && userMapping[bodyShopId]?.socketIds) { + userMapping[bodyShopId].socketIds.forEach((socketId) => { + ioRedis.to(socketId).emit("notification", { + jobId, + jobRoNumber, + bodyShopId, + notifications, + notificationId, + associationId + }); + }); + + devDebugLogger( + `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} (notificationId ${notificationId})` + ); + } else { + devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); + } + } + } + + // Cleanup recipient tracking keys + consolidation flag + await pubClient.del(rkSet); + await pubClient.del(assocHash); + await pubClient.del(consolidateFlagKey(jobId)); + } catch (err) { + logger.log("app-queue-consolidation-error", "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }); + throw err; + } finally { + await pubClient.del(lockKey); } }, { @@ -244,13 +316,14 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`)); addWorker.on("failed", (job, err) => - logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", { + logger.log("app-queue-notification-error", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); + consolidateWorker.on("failed", (job, err) => - logger.log(`app-queue-consolidation-failed:`, "ERROR", "notifications", "api", { + logger.log("app-queue-consolidation-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) @@ -285,11 +358,13 @@ const dispatchAppsToQueue = async ({ appsToDispatch }) => { for (const app of appsToDispatch) { const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app; + await appQueue.add( "add-notification", { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber }, { jobId: `${jobId}-${Date.now()}` } ); + devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index a5ad8a530..a38765450 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -27,6 +27,16 @@ let emailConsolidateQueue; let emailAddWorker; let emailConsolidateWorker; +const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); + +const escapeHtml = (s = "") => + String(s) + .replace(/&/g, "&") + .replace(//g, ">") + .replace(/"/g, """) + .replace(/'/g, "'"); + /** * Initializes the email notification queues and workers. * @@ -65,17 +75,21 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`; - for (const recipient of recipients) { + for (const recipient of recipients || []) { const { user, firstName, lastName } = recipient; + if (!user) continue; const userKey = `${redisKeyPrefix}:${user}`; await pubClient.rpush(userKey, body); - await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); + await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION)); const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`; await pubClient.hsetnx(detailsKey, "firstName", firstName || ""); await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); - await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone); - await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); - await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user); + const tzValue = bodyShopTimezone || "UTC"; + await pubClient.hsetnx(detailsKey, "bodyShopTimezone", tzValue); + await pubClient.expire(detailsKey, seconds(NOTIFICATION_EXPIRATION)); + const recipientsSetKey = `email:${devKey}:recipients:${jobId}`; + await pubClient.sadd(recipientsSetKey, user); + await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION)); devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`); } @@ -93,7 +107,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { } ); devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`); - await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); + await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION)); } else { devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`); } @@ -113,7 +127,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { devDebugLogger(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`; - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); if (lockAcquired) { try { const recipientsSet = `email:${devKey}:recipients:${jobId}`; @@ -139,7 +153,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
    - ${messages.map((msg) => `
  • ${msg}
  • `).join("")} + ${messages.map((msg) => `
  • ${escapeHtml(msg)}
  • `).join("")}
@@ -239,7 +253,13 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { const emailAddQueue = getQueue(); for (const email of emailsToDispatch) { - const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = email; + const { jobId, bodyShopName, bodyShopTimezone, body, recipients } = email; + let { jobRoNumber } = email; + + // Make sure Jobs that have not been coverted yet can still get notifications + if (jobRoNumber === null) { + jobRoNumber = "N/A"; + } if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) { devDebugLogger( diff --git a/server/notifications/queues/fcmQueue.js b/server/notifications/queues/fcmQueue.js new file mode 100644 index 000000000..4bbd34cac --- /dev/null +++ b/server/notifications/queues/fcmQueue.js @@ -0,0 +1,569 @@ +// NOTE: Despite the filename, this implementation targets Expo Push Tokens (ExponentPushToken[...]). +// It does NOT use Firebase Admin and does NOT require credentials (no EXPO_ACCESS_TOKEN). + +const { Queue, Worker } = require("bullmq"); +const { registerCleanupTask } = require("../../utils/cleanupManager"); +const getBullMQPrefix = require("../../utils/getBullMQPrefix"); +const devDebugLogger = require("../../utils/devDebugLogger"); + +const { client: gqlClient } = require("../../graphql-client/graphql-client"); +const { GET_USERS_FCM_TOKENS_BY_EMAILS, UPDATE_USER_FCM_TOKENS_BY_EMAIL } = require("../../graphql-client/queries"); + +const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => { + const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS; + const parsedValue = envValue ? parseInt(envValue, 10) : NaN; + return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); +})(); + +const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000; + +// pegged constants (pattern matches your other queues) +const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; + +// IMPORTANT: lock must outlive a full consolidation run to avoid duplicate sends. +const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; + +// Keep Bull backoff separate from lock TTL to avoid unexpected long retries. +const BACKOFF_DELAY = Math.max(1000, Math.floor(FCM_CONSOLIDATION_DELAY * 0.25)); + +const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1; +const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; + +const EXPO_PUSH_ENDPOINT = "https://exp.host/--/api/v2/push/send"; +const EXPO_MAX_MESSAGES_PER_REQUEST = 100; + +let fcmAddQueue; +let fcmConsolidateQueue; +let fcmAddWorker; +let fcmConsolidateWorker; + +/** + * Milliseconds to seconds. + * @param ms + * @returns {number} + */ +const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); + +/** + * Chunk an array into smaller arrays of given size. + * @param arr + * @param size + * @returns {*[]} + */ +const chunk = (arr, size) => { + const out = []; + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); + return out; +}; + +/** + * Check if a string is an Expo push token. + * @param s + * @returns {boolean} + */ +const isExpoPushToken = (s) => { + if (!s || typeof s !== "string") return false; + // Common formats observed in the wild: + // - ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx] + // - ExpoPushToken[xxxxxxxxxxxxxxxxxxxxxx] + return /^ExponentPushToken\[[^\]]+\]$/.test(s) || /^ExpoPushToken\[[^\]]+\]$/.test(s); +}; + +/** + * Get unique, trimmed strings from an array. + * @param arr + * @returns {any[]} + */ +const uniqStrings = (arr) => [ + ...new Set( + arr + .filter(Boolean) + .map((x) => String(x).trim()) + .filter(Boolean) + ) +]; + +/** + * Normalize users.fcmtokens (jsonb) into an array of Expo push tokens. + * + * New expected shape (example): + * { + * "ExponentPushToken[dksJAdLUTofdEk7P59thue]": { + * "platform": "ios", + * "timestamp": 1767397802709, + * "pushTokenString": "ExponentPushToken[dksJAdLUTofdEk7P59thue]" + * } + * } + * + * Also supports older/alternate shapes: + * - string: "ExponentPushToken[...]" + * - array: ["ExponentPushToken[...]", ...] + * - object: token keys OR values containing token-like fields + * @param fcmtokens + * @returns {string[]|*[]} + */ +const normalizeTokens = (fcmtokens) => { + if (!fcmtokens) return []; + + if (typeof fcmtokens === "string") { + const s = fcmtokens.trim(); + return isExpoPushToken(s) ? [s] : []; + } + + if (Array.isArray(fcmtokens)) { + return uniqStrings(fcmtokens).filter(isExpoPushToken); + } + + if (typeof fcmtokens === "object") { + const keys = Object.keys(fcmtokens || {}); + const vals = Object.values(fcmtokens || {}); + + const fromKeys = keys.filter(isExpoPushToken); + + const fromValues = vals + .map((v) => { + if (!v) return null; + + // Some shapes store token as a string value directly + if (typeof v === "string") return v; + + if (typeof v === "object") { + // Your new shape uses pushTokenString + return v.pushTokenString || v.token || v.expoPushToken || null; + } + + return null; + }) + .filter(Boolean) + .map(String); + + return uniqStrings([...fromKeys, ...fromValues]).filter(isExpoPushToken); + } + + return []; +}; + +/** + * Remove specified tokens from the stored fcmtokens jsonb while preserving the original shape. + * @param fcmtokens + * @param tokensToRemove + * @returns {*} + */ +const removeTokensFromFcmtokens = (fcmtokens, tokensToRemove) => { + const remove = new Set((tokensToRemove || []).map((t) => String(t).trim()).filter(Boolean)); + if (!remove.size) return fcmtokens; + if (!fcmtokens) return fcmtokens; + + if (typeof fcmtokens === "string") { + const s = fcmtokens.trim(); + return remove.has(s) ? null : fcmtokens; + } + + if (Array.isArray(fcmtokens)) { + const next = fcmtokens.filter((t) => !remove.has(String(t).trim())); + return next.length ? next : []; + } + + if (typeof fcmtokens === "object") { + const next = {}; + for (const [k, v] of Object.entries(fcmtokens)) { + const keyIsToken = isExpoPushToken(k) && remove.has(k); + + let valueToken = null; + if (typeof v === "string") valueToken = v; + else if (v && typeof v === "object") valueToken = v.pushTokenString || v.token || v.expoPushToken || null; + + const valueIsToken = valueToken && remove.has(String(valueToken).trim()); + + if (keyIsToken || valueIsToken) continue; + next[k] = v; + } + return Object.keys(next).length ? next : {}; + } + + return fcmtokens; +}; + +/** + * Safely parse JSON response. + * @param res + * @returns {Promise<*|null>} + */ +const safeJson = async (res) => { + try { + return await res.json(); + } catch { + return null; + } +}; + +/** + * Send Expo push notifications. + * Returns invalid tokens that should be removed (e.g., DeviceNotRegistered). + * + * @param {Array} messages Expo messages array + * @param {Object} logger + * @returns {Promise<{invalidTokens: string[], ticketIds: string[]}>} + */ +const sendExpoPush = async ({ messages, logger }) => { + if (!messages?.length) return { invalidTokens: [], ticketIds: [] }; + + const invalidTokens = new Set(); + const ticketIds = []; + + for (const batch of chunk(messages, EXPO_MAX_MESSAGES_PER_REQUEST)) { + const res = await fetch(EXPO_PUSH_ENDPOINT, { + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json" + }, + body: JSON.stringify(batch) + }); + + const payload = await safeJson(res); + + if (!res.ok) { + logger?.log?.("expo-push-http-error", "ERROR", "notifications", "api", { + status: res.status, + statusText: res.statusText, + payload + }); + throw new Error(`Expo push HTTP error: ${res.status} ${res.statusText}`); + } + + const tickets = Array.isArray(payload?.data) ? payload.data : payload?.data ? [payload.data] : []; + + if (!tickets.length) { + logger?.log?.("expo-push-bad-response", "ERROR", "notifications", "api", { payload }); + continue; + } + + // Expo returns tickets in the same order as messages in the request batch + for (let i = 0; i < tickets.length; i++) { + const t = tickets[i]; + const msg = batch[i]; + const token = typeof msg?.to === "string" ? msg.to : null; + + if (t?.status === "ok" && t?.id) ticketIds.push(String(t.id)); + + if (t?.status === "error") { + const errCode = t?.details?.error; + const msgText = String(t?.message || ""); + + const shouldDelete = + errCode === "DeviceNotRegistered" || /not a registered push notification recipient/i.test(msgText); + + if (shouldDelete && token && isExpoPushToken(token)) { + invalidTokens.add(token); + } + + logger?.log?.("expo-push-ticket-error", "ERROR", "notifications", "api", { + token, + ticket: t + }); + } + } + } + + return { invalidTokens: [...invalidTokens], ticketIds }; +}; + +/** + * Build a summary string for push notification body. + * @param count + * @param jobRoNumber + * @param bodyShopName + * @returns {`${string} ${string} for ${string|string}${string|string}`} + */ +const buildPushSummary = ({ count, jobRoNumber, bodyShopName }) => { + const updates = count === 1 ? "update" : "updates"; + const ro = jobRoNumber ? `RO ${jobRoNumber}` : "a job"; + const shop = bodyShopName ? ` at ${bodyShopName}` : ""; + return `${count} ${updates} for ${ro}${shop}`; +}; + +/** + * Loads the push notification queues and workers (Expo push). + * @param pubClient + * @param logger + * @returns {Promise} + */ +const loadFcmQueue = async ({ pubClient, logger }) => { + if (!fcmAddQueue || !fcmConsolidateQueue) { + const prefix = getBullMQPrefix(); + const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; + + devDebugLogger(`Initializing Expo Push Queues with prefix: ${prefix}`); + + fcmAddQueue = new Queue("fcmAdd", { + prefix, + connection: pubClient, + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } + }); + + fcmConsolidateQueue = new Queue("fcmConsolidate", { + prefix, + connection: pubClient, + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } + }); + + fcmAddWorker = new Worker( + "fcmAdd", + async (job) => { + const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = + job.data; + + devDebugLogger(`Adding push notifications for jobId ${jobId}`); + + const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`; // set of user emails + const metaKey = `fcm:${devKey}:meta:${jobId}`; + const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; // per-user list keys + + // Store job-level metadata (always keep latest values) + await pubClient.hset(metaKey, "jobRoNumber", jobRoNumber || ""); + await pubClient.hset(metaKey, "bodyShopId", bodyShopId || ""); + await pubClient.hset(metaKey, "bodyShopName", bodyShopName || ""); + await pubClient.expire(metaKey, seconds(NOTIFICATION_EXPIRATION)); + + for (const r of recipients || []) { + const user = r?.user; + const associationId = r?.associationId; + + if (!user) continue; + + const userKey = `${redisKeyPrefix}:${user}`; + const payload = JSON.stringify({ + body: body || "", + scenarioKey: scenarioKey || "", + key: key || "", + variables: variables || {}, + associationId: associationId ? String(associationId) : null, + ts: Date.now() + }); + + await pubClient.rpush(userKey, payload); + await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION)); + + await pubClient.sadd(recipientsSetKey, user); + await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION)); + } + + const consolidateKey = `fcm:${devKey}:consolidate:${jobId}`; + const flagSet = await pubClient.setnx(consolidateKey, "pending"); + + if (flagSet) { + await fcmConsolidateQueue.add( + "consolidate-fcm", + { jobId }, + { + jobId: `consolidate-${jobId}`, + delay: FCM_CONSOLIDATION_DELAY, + attempts: 3, + backoff: BACKOFF_DELAY + } + ); + + await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION)); + devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); + } else { + devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); + } + }, + { prefix, connection: pubClient, concurrency: 5 } + ); + + fcmConsolidateWorker = new Worker( + "fcmConsolidate", + async (job) => { + const { jobId } = job.data; + const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; + + const lockKey = `lock:${devKey}:fcmConsolidate:${jobId}`; + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); + + if (!lockAcquired) { + devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); + return; + } + + try { + const recipientsSet = `fcm:${devKey}:recipients:${jobId}`; + const userEmails = await pubClient.smembers(recipientsSet); + + if (!userEmails?.length) { + devDebugLogger(`No recipients found for jobId ${jobId}`); + await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`); + return; + } + + // Load meta + const metaKey = `fcm:${devKey}:meta:${jobId}`; + const meta = await pubClient.hgetall(metaKey); + const jobRoNumber = meta?.jobRoNumber || ""; + const bodyShopId = meta?.bodyShopId || ""; + const bodyShopName = meta?.bodyShopName || ""; + + // Fetch tokens for all recipients (1 DB round-trip) + const usersResp = await gqlClient.request(GET_USERS_FCM_TOKENS_BY_EMAILS, { emails: userEmails }); + + // Map: email -> { raw, tokens } + const tokenMap = new Map( + (usersResp?.users || []).map((u) => [ + String(u.email), + { raw: u.fcmtokens, tokens: normalizeTokens(u.fcmtokens) } + ]) + ); + + for (const userEmail of userEmails) { + const userKey = `fcm:${devKey}:notifications:${jobId}:${userEmail}`; + const raw = await pubClient.lrange(userKey, 0, -1); + + if (!raw?.length) { + await pubClient.del(userKey); + continue; + } + + const parsed = raw + .map((x) => { + try { + return JSON.parse(x); + } catch { + return null; + } + }) + .filter(Boolean); + + const count = parsed.length; + const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName }); + + // associationId should be stable for a user in a job’s bodyshop; take first non-null + const firstWithAssociation = parsed.find((p) => p?.associationId != null); + const associationId = + firstWithAssociation?.associationId != null ? String(firstWithAssociation.associationId) : ""; + + const tokenInfo = tokenMap.get(String(userEmail)) || { raw: null, tokens: [] }; + const tokens = tokenInfo.tokens || []; + + if (!tokens.length) { + devDebugLogger(`No Expo push tokens for ${userEmail}; skipping push for jobId ${jobId}`); + await pubClient.del(userKey); + continue; + } + + // Build 1 message per device token + const messages = tokens.map((token) => ({ + to: token, + title: "ImEX Online", + body: notificationBody, + priority: "high", + data: { + type: "job-notification", + jobId: String(jobId), + jobRoNumber: String(jobRoNumber || ""), + bodyShopId: String(bodyShopId || ""), + bodyShopName: String(bodyShopName || ""), + associationId: String(associationId || ""), + userEmail: String(userEmail), + count: String(count) + } + })); + + const { invalidTokens } = await sendExpoPush({ messages, logger }); + + // Opportunistic cleanup: remove invalid tokens from users.fcmtokens + if (invalidTokens?.length) { + try { + const nextFcmtokens = removeTokensFromFcmtokens(tokenInfo.raw, invalidTokens); + + await gqlClient.request(UPDATE_USER_FCM_TOKENS_BY_EMAIL, { + email: String(userEmail), + fcmtokens: nextFcmtokens + }); + + devDebugLogger(`Cleaned ${invalidTokens.length} invalid Expo tokens for ${userEmail}`); + } catch (e) { + logger?.log?.("expo-push-token-cleanup-failed", "ERROR", "notifications", "api", { + userEmail: String(userEmail), + message: e?.message, + stack: e?.stack + }); + // Do not throw: cleanup failure should not retry the whole consolidation and risk duplicate pushes. + } + } + + devDebugLogger(`Sent Expo push to ${userEmail} for jobId ${jobId} (${count} updates)`); + + await pubClient.del(userKey); + } + + await pubClient.del(recipientsSet); + await pubClient.del(metaKey); + await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`); + } catch (err) { + logger.log("fcm-queue-consolidation-error", "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }); + throw err; + } finally { + await pubClient.del(lockKey); + } + }, + { prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } + ); + + fcmAddWorker.on("failed", (job, err) => + logger.log("fcm-add-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) + ); + + fcmConsolidateWorker.on("failed", (job, err) => + logger.log("fcm-consolidate-failed", "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }) + ); + + const shutdown = async () => { + devDebugLogger("Closing push queue workers..."); + await Promise.all([fcmAddWorker.close(), fcmConsolidateWorker.close()]); + devDebugLogger("Push queue workers closed"); + }; + + registerCleanupTask(shutdown); + } + + return fcmAddQueue; +}; + +/** + * Get the add queue. + * @returns {*} + */ +const getQueue = () => { + if (!fcmAddQueue) throw new Error("FCM add queue not initialized. Ensure loadFcmQueue is called during bootstrap."); + return fcmAddQueue; +}; + +/** + * Dispatch push notifications to the add queue. + * @param fcmsToDispatch + * @returns {Promise} + */ +const dispatchFcmsToQueue = async ({ fcmsToDispatch }) => { + const queue = getQueue(); + + for (const fcm of fcmsToDispatch) { + const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = fcm; + + if (!jobId || !recipients?.length) continue; + + await queue.add( + "add-fcm-notification", + { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients }, + { jobId: `${jobId}-${Date.now()}` } + ); + } +}; + +module.exports = { loadFcmQueue, getQueue, dispatchFcmsToQueue }; diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js index ec24e1804..d94c5390b 100644 --- a/server/notifications/scenarioBuilders.js +++ b/server/notifications/scenarioBuilders.js @@ -19,6 +19,8 @@ const buildNotification = (data, key, body, variables = {}) => { jobId: data.jobId, jobRoNumber: data.jobRoNumber, bodyShopId: data.bodyShopId, + scenarioKey: data.scenarioKey, + scenarioTable: data.scenarioTable, key, body, variables, @@ -32,21 +34,47 @@ const buildNotification = (data, key, body, variables = {}) => { body, recipients: [] }, - fcm: { recipients: [] } + fcm: { + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopId: data.bodyShopId, + bodyShopName: data.bodyShopName, + bodyShopTimezone: data.bodyShopTimezone, + scenarioKey: data.scenarioKey, + scenarioTable: data.scenarioTable, + key, + body, + variables, + recipients: [] + } }; // Populate recipients from scenarioWatchers data.scenarioWatchers.forEach((recipients) => { const { user, app, fcm, email, firstName, lastName, employeeId, associationId } = recipients; - if (app === true) + + if (app === true) { result.app.recipients.push({ user, bodyShopId: data.bodyShopId, employeeId, associationId }); - if (fcm === true) result.fcm.recipients.push(user); - if (email === true) result.email.recipients.push({ user, firstName, lastName }); + } + + if (email === true) { + result.email.recipients.push({ user, firstName, lastName }); + } + + if (fcm === true) { + // Keep structure consistent and future-proof (token lookup is done server-side) + result.fcm.recipients.push({ + user, + bodyShopId: data.bodyShopId, + employeeId, + associationId + }); + } }); return result; diff --git a/server/notifications/scenarioParser.js b/server/notifications/scenarioParser.js index aebec8205..7e6a211ad 100644 --- a/server/notifications/scenarioParser.js +++ b/server/notifications/scenarioParser.js @@ -14,6 +14,7 @@ const { isEmpty, isFunction } = require("lodash"); const { getMatchingScenarios } = require("./scenarioMapper"); const { dispatchEmailsToQueue } = require("./queues/emailQueue"); const { dispatchAppsToQueue } = require("./queues/appQueue"); +const { dispatchFcmsToQueue } = require("./queues/fcmQueue"); // NEW // If true, the user who commits the action will NOT receive notifications; if false, they will. const FILTER_SELF_FROM_WATCHERS = process.env?.FILTER_SELF_FROM_WATCHERS !== "false"; @@ -298,6 +299,16 @@ const scenarioParser = async (req, jobIdField) => { }) ); } + + const fcmsToDispatch = scenariosToDispatch.map((scenario) => scenario?.fcm); + if (!isEmpty(fcmsToDispatch)) { + dispatchFcmsToQueue({ fcmsToDispatch, logger }).catch((e) => + logger.log("Something went wrong dispatching FCMs to the FCM Notification Queue", "error", "queue", null, { + message: e?.message, + stack: e?.stack + }) + ); + } }; module.exports = scenarioParser;