From 4cdc15f70b0b4e3fcfbf00320854a83183c45d7b Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 5 Jan 2026 16:51:28 -0500 Subject: [PATCH] feature/IO-3492-FCM-Queue-For-Notifications: Checkpoint --- server/graphql-client/queries.js | 9 + server/notifications/queues/fcmQueue.js | 326 ++++++++++++++++++------ 2 files changed, 260 insertions(+), 75 deletions(-) diff --git a/server/graphql-client/queries.js b/server/graphql-client/queries.js index b51c73c49..451ec6bed 100644 --- a/server/graphql-client/queries.js +++ b/server/graphql-client/queries.js @@ -3187,3 +3187,12 @@ mutation INSERT_MEDIA_ANALYTICS($mediaObject: media_analytics_insert_input!) { } } `; + +exports.GET_USERS_FCM_TOKENS_BY_EMAILS = /* GraphQL */ ` + query GET_USERS_FCM_TOKENS_BY_EMAILS($emails: [String!]!) { + users(where: { email: { _in: $emails } }) { + email + fcmtokens + } + } +`; diff --git a/server/notifications/queues/fcmQueue.js b/server/notifications/queues/fcmQueue.js index 118676075..e0c7f0776 100644 --- a/server/notifications/queues/fcmQueue.js +++ b/server/notifications/queues/fcmQueue.js @@ -1,8 +1,14 @@ +// NOTE: Despite the filename, this implementation targets Expo Push Tokens (ExponentPushToken[...]). +// It does NOT use Firebase Admin and does NOT require credentials (no EXPO_ACCESS_TOKEN). + const { Queue, Worker } = require("bullmq"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); +const { client: gqlClient } = require("../../graphql-client/graphql-client"); +const { GET_USERS_FCM_TOKENS_BY_EMAILS } = require("../../graphql-client/queries"); + const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; @@ -17,35 +23,173 @@ const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 0.25; const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1; const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; +const EXPO_PUSH_ENDPOINT = "https://exp.host/--/api/v2/push/send"; +const EXPO_MAX_MESSAGES_PER_REQUEST = 100; + let fcmAddQueue; let fcmConsolidateQueue; let fcmAddWorker; let fcmConsolidateWorker; -// IMPORTANT: do NOT require firebase-handler at module load time. -// firebase-handler does `require(process.env.FIREBASE_ADMINSDK_JSON)` at top-level, -// which will hard-crash environments that don’t have Firebase configured. -const hasFirebaseEnv = () => Boolean(process.env.FIREBASE_ADMINSDK_JSON && process.env.FIREBASE_DATABASE_URL); +/** + * Milliseconds to seconds. + * @param ms + * @returns {number} + */ +const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); /** - * Get the Firebase Admin SDK, or null if Firebase is not configured. - * @returns {{app: app, remoteConfig: ((app?: App) => remoteConfig.RemoteConfig) | remoteConfig, firestore: ((app?: App) => FirebaseFirestore.Firestore) | firestore, AppOptions: AppOptions, auth: ((app?: App) => auth.Auth) | auth, securityRules: ((app?: App) => securityRules.SecurityRules) | securityRules, installations: ((app?: App) => installations.Installations) | installations, FirebaseArrayIndexError: FirebaseArrayIndexError, storage: ((app?: App) => storage.Storage) | storage, appCheck: ((app?: App) => appCheck.AppCheck) | appCheck, initializeApp(options?: AppOptions, name?: string): app.App, FirebaseError: FirebaseError, messaging: ((app?: App) => messaging.Messaging) | messaging, projectManagement: ((app?: App) => projectManagement.ProjectManagement) | projectManagement, database: ((app?: App) => database.Database) | database, machineLearning: ((app?: App) => machineLearning.MachineLearning) | machineLearning, instanceId: ((app?: App) => instanceId.InstanceId) | instanceId, SDK_VERSION: string, apps: (app.App | null)[], credential: credential, ServiceAccount: ServiceAccount, GoogleOAuthAccessToken: GoogleOAuthAccessToken}|null} + * Chunk an array into smaller arrays of given size. + * @param arr + * @param size + * @returns {*[]} */ -const getFirebaseAdmin = () => { - if (!hasFirebaseEnv()) return null; - const { admin } = require("../../firebase/firebase-handler"); - return admin; +const chunk = (arr, size) => { + const out = []; + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); + return out; }; /** - * Get the FCM topic name for an association. - * @param associationId - * @returns {`assoc-${string}-notifications`} + * Check if a string is an Expo push token. + * @param s + * @returns {boolean} */ -const topicForAssociation = (associationId) => `assoc-${associationId}-notifications`; +const isExpoPushToken = (s) => { + if (!s || typeof s !== "string") return false; + // Common formats observed in the wild: + // - ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx] + // - ExpoPushToken[xxxxxxxxxxxxxxxxxxxxxx] + return /^ExponentPushToken\[[^\]]+\]$/.test(s) || /^ExpoPushToken\[[^\]]+\]$/.test(s); +}; /** - * Build a summary string for FCM push notification body. + * Get unique, trimmed strings from an array. + * @param arr + * @returns {any[]} + */ +const uniqStrings = (arr) => [ + ...new Set( + arr + .filter(Boolean) + .map((x) => String(x).trim()) + .filter(Boolean) + ) +]; + +/** + * Normalize users.fcmtokens (jsonb) into an array of Expo push tokens. + * + * New expected shape (example): + * { + * "ExponentPushToken[dksJAdLUTofdEk7P59thue]": { + * "platform": "ios", + * "timestamp": 1767397802709, + * "pushTokenString": "ExponentPushToken[dksJAdLUTofdEk7P59thue]" + * } + * } + * + * Also supports older/alternate shapes: + * - string: "ExponentPushToken[...]" + * - array: ["ExponentPushToken[...]", ...] + * - object: token keys OR values containing token-like fields + * @param fcmtokens + * @returns {string[]|*[]} + */ +const normalizeTokens = (fcmtokens) => { + if (!fcmtokens) return []; + + if (typeof fcmtokens === "string") { + const s = fcmtokens.trim(); + return isExpoPushToken(s) ? [s] : []; + } + + if (Array.isArray(fcmtokens)) { + return uniqStrings(fcmtokens).filter(isExpoPushToken); + } + + if (typeof fcmtokens === "object") { + const keys = Object.keys(fcmtokens || {}); + const vals = Object.values(fcmtokens || {}); + + const fromKeys = keys.filter(isExpoPushToken); + + const fromValues = vals + .map((v) => { + if (!v) return null; + + // Some shapes store token as a string value directly + if (typeof v === "string") return v; + + if (typeof v === "object") { + // Your new shape uses pushTokenString + return v.pushTokenString || v.token || v.expoPushToken || null; + } + + return null; + }) + .filter(Boolean) + .map(String); + + return uniqStrings([...fromKeys, ...fromValues]).filter(isExpoPushToken); + } + + return []; +}; + +/** + * Safely parse JSON response. + * @param res + * @returns {Promise<*|null>} + */ +const safeJson = async (res) => { + try { + return await res.json(); + } catch { + return null; + } +}; + +/** + * Send Expo push notifications + * @param {Array} messages Expo messages array + * @param {Object} logger + */ +const sendExpoPush = async ({ messages, logger }) => { + if (!messages?.length) return; + + for (const batch of chunk(messages, EXPO_MAX_MESSAGES_PER_REQUEST)) { + const res = await fetch(EXPO_PUSH_ENDPOINT, { + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json" + }, + body: JSON.stringify(batch) + }); + + const payload = await safeJson(res); + + if (!res.ok) { + logger?.log?.("expo-push-http-error", "ERROR", "notifications", "api", { + status: res.status, + statusText: res.statusText, + payload + }); + throw new Error(`Expo push HTTP error: ${res.status} ${res.statusText}`); + } + + const data = payload?.data; + if (Array.isArray(data)) { + const errors = data.filter((t) => t?.status === "error"); + if (errors.length) { + logger?.log?.("expo-push-ticket-errors", "ERROR", "notifications", "api", { errors }); + } + } + } +}; +/** + * Build a summary string for push notification body. * @param count * @param jobRoNumber * @param bodyShopName @@ -59,22 +203,17 @@ const buildPushSummary = ({ count, jobRoNumber, bodyShopName }) => { }; /** - * Loads the FCM notification queues and workers. + * Loads the push notification queues and workers (Expo push). * @param pubClient * @param logger - * @returns {Promise, ExtractResultType, ExtractNameType>|null>} + * @returns {Promise} */ const loadFcmQueue = async ({ pubClient, logger }) => { - if (!hasFirebaseEnv()) { - devDebugLogger("FCM queue not initialized (Firebase env not configured)."); - return null; - } - if (!fcmAddQueue || !fcmConsolidateQueue) { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; - devDebugLogger(`Initializing FCM Queues with prefix: ${prefix}`); + devDebugLogger(`Initializing Expo Push Queues with prefix: ${prefix}`); fcmAddQueue = new Queue("fcmAdd", { prefix, @@ -93,35 +232,40 @@ const loadFcmQueue = async ({ pubClient, logger }) => { async (job) => { const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = job.data; - devDebugLogger(`Adding FCM notifications for jobId ${jobId}`); - const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; + devDebugLogger(`Adding push notifications for jobId ${jobId}`); - for (const r of recipients) { + const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`; // set of user emails + const metaKey = `fcm:${devKey}:meta:${jobId}`; + const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; // per-user list keys + + // store job-level metadata once + await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || ""); + await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || ""); + await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || ""); + await pubClient.expire(metaKey, seconds(NOTIFICATION_EXPIRATION)); + + for (const r of recipients || []) { + const user = r?.user; const associationId = r?.associationId; - if (!associationId) continue; - const assocKey = `${redisKeyPrefix}:${associationId}`; + if (!user) continue; + + const userKey = `${redisKeyPrefix}:${user}`; const payload = JSON.stringify({ body: body || "", scenarioKey: scenarioKey || "", key: key || "", variables: variables || {}, + associationId: associationId ? String(associationId) : null, ts: Date.now() }); - await pubClient.rpush(assocKey, payload); - await pubClient.expire(assocKey, NOTIFICATION_EXPIRATION / 1000); - const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`; - await pubClient.sadd(recipientsSetKey, associationId); - await pubClient.expire(recipientsSetKey, NOTIFICATION_EXPIRATION / 1000); + await pubClient.rpush(userKey, payload); + await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION)); - // store some metadata once per jobId - const metaKey = `fcm:${devKey}:meta:${jobId}`; - await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || ""); - await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || ""); - await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || ""); - await pubClient.expire(metaKey, NOTIFICATION_EXPIRATION / 1000); + await pubClient.sadd(recipientsSetKey, user); + await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION)); } const consolidateKey = `fcm:${devKey}:consolidate:${jobId}`; @@ -138,10 +282,11 @@ const loadFcmQueue = async ({ pubClient, logger }) => { backoff: LOCK_EXPIRATION } ); - await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); - devDebugLogger(`Scheduled FCM consolidation for jobId ${jobId}`); + + await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION)); + devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); } else { - devDebugLogger(`FCM consolidation already scheduled for jobId ${jobId}`); + devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } }, { prefix, connection: pubClient, concurrency: 5 } @@ -154,63 +299,95 @@ const loadFcmQueue = async ({ pubClient, logger }) => { const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; const lockKey = `lock:${devKey}:fcmConsolidate:${jobId}`; - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); if (!lockAcquired) { - devDebugLogger(`Skipped FCM consolidation for jobId ${jobId} - lock held by another worker`); + devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); return; } try { - const admin = getFirebaseAdmin(); - if (!admin) { - devDebugLogger("FCM consolidation skipped (Firebase not available)."); + const recipientsSet = `fcm:${devKey}:recipients:${jobId}`; + const userEmails = await pubClient.smembers(recipientsSet); + + if (!userEmails?.length) { + devDebugLogger(`No recipients found for jobId ${jobId}`); + await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`); return; } - const recipientsSet = `fcm:${devKey}:recipients:${jobId}`; - const associationIds = await pubClient.smembers(recipientsSet); - + // Load meta const metaKey = `fcm:${devKey}:meta:${jobId}`; const meta = await pubClient.hgetall(metaKey); const jobRoNumber = meta?.jobRoNumber || ""; const bodyShopId = meta?.bodyShopId || ""; const bodyShopName = meta?.bodyShopName || ""; - for (const associationId of associationIds) { - const assocKey = `fcm:${devKey}:notifications:${jobId}:${associationId}`; - const messages = await pubClient.lrange(assocKey, 0, -1); + // Fetch tokens for all recipients (1 DB round-trip) + const usersResp = await gqlClient.request(GET_USERS_FCM_TOKENS_BY_EMAILS, { emails: userEmails }); + const tokenMap = new Map( + (usersResp?.users || []).map((u) => [String(u.email), normalizeTokens(u.fcmtokens)]) + ); - if (!messages?.length) continue; + for (const userEmail of userEmails) { + const userKey = `fcm:${devKey}:notifications:${jobId}:${userEmail}`; + const raw = await pubClient.lrange(userKey, 0, -1); - const count = messages.length; + if (!raw?.length) { + await pubClient.del(userKey); + continue; + } + + const parsed = raw + .map((x) => { + try { + return JSON.parse(x); + } catch { + return null; + } + }) + .filter(Boolean); + + const count = parsed.length; const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName }); - const topic = topicForAssociation(associationId); + // associationId should be stable for a user in a job’s bodyshop; take first non-null + const associationId = + parsed.find((p) => p?.associationId)?.associationId != null + ? String(parsed.find((p) => p?.associationId)?.associationId) + : ""; - // FCM "data" values MUST be strings - await admin.messaging().send({ - topic, - notification: { - title: "ImEX Online", - body: notificationBody - }, + const tokens = tokenMap.get(String(userEmail)) || []; + + if (!tokens.length) { + devDebugLogger(`No Expo push tokens for ${userEmail}; skipping push for jobId ${jobId}`); + await pubClient.del(userKey); + continue; + } + + // Build 1 message per device token + const messages = tokens.map((token) => ({ + to: token, + title: "ImEX Online", + body: notificationBody, + priority: "high", data: { type: "job-notification", jobId: String(jobId), jobRoNumber: String(jobRoNumber || ""), bodyShopId: String(bodyShopId || ""), bodyShopName: String(bodyShopName || ""), - associationId: String(associationId), + associationId: String(associationId || ""), + userEmail: String(userEmail), count: String(count) - }, - android: { priority: "high" }, - apns: { headers: { "apns-priority": "10" } } - }); + } + })); - devDebugLogger(`Sent FCM push to topic ${topic} for jobId ${jobId} (${count} updates)`); + await sendExpoPush({ messages, logger }); - await pubClient.del(assocKey); + devDebugLogger(`Sent Expo push to ${userEmail} for jobId ${jobId} (${count} updates)`); + + await pubClient.del(userKey); } await pubClient.del(recipientsSet); @@ -241,9 +418,9 @@ const loadFcmQueue = async ({ pubClient, logger }) => { ); const shutdown = async () => { - devDebugLogger("Closing FCM queue workers..."); + devDebugLogger("Closing push queue workers..."); await Promise.all([fcmAddWorker.close(), fcmConsolidateWorker.close()]); - devDebugLogger("FCM queue workers closed"); + devDebugLogger("Push queue workers closed"); }; registerCleanupTask(shutdown); @@ -253,7 +430,7 @@ const loadFcmQueue = async ({ pubClient, logger }) => { }; /** - * Get the FCM add queue. + * Get the add queue. * @returns {*} */ const getQueue = () => { @@ -262,12 +439,11 @@ const getQueue = () => { }; /** - * Dispatch FCM notifications to the FCM add queue. + * Dispatch push notifications to the add queue. * @param fcmsToDispatch * @returns {Promise} */ const dispatchFcmsToQueue = async ({ fcmsToDispatch }) => { - if (!hasFirebaseEnv()) return; const queue = getQueue(); for (const fcm of fcmsToDispatch) {