// NOTE: Despite the filename, this implementation targets Expo Push Tokens (ExponentPushToken[...]). // It does NOT use Firebase Admin and does NOT require credentials (no EXPO_ACCESS_TOKEN). const { Queue, Worker } = require("bullmq"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); const { client: gqlClient } = require("../../graphql-client/graphql-client"); const { GET_USERS_FCM_TOKENS_BY_EMAILS, UPDATE_USER_FCM_TOKENS_BY_EMAIL } = require("../../graphql-client/queries"); const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); })(); const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000; // pegged constants (pattern matches your other queues) const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; // IMPORTANT: lock must outlive a full consolidation run to avoid duplicate sends. const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; // Keep Bull backoff separate from lock TTL to avoid unexpected long retries. const BACKOFF_DELAY = Math.max(1000, Math.floor(FCM_CONSOLIDATION_DELAY * 0.25)); const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1; const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; const EXPO_PUSH_ENDPOINT = "https://exp.host/--/api/v2/push/send"; const EXPO_MAX_MESSAGES_PER_REQUEST = 100; let fcmAddQueue; let fcmConsolidateQueue; let fcmAddWorker; let fcmConsolidateWorker; /** * Milliseconds to seconds. * @param ms * @returns {number} */ const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); /** * Chunk an array into smaller arrays of given size. * @param arr * @param size * @returns {*[]} */ const chunk = (arr, size) => { const out = []; for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); return out; }; /** * Check if a string is an Expo push token. * @param s * @returns {boolean} */ const isExpoPushToken = (s) => { if (!s || typeof s !== "string") return false; // Common formats observed in the wild: // - ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx] // - ExpoPushToken[xxxxxxxxxxxxxxxxxxxxxx] return /^ExponentPushToken\[[^\]]+\]$/.test(s) || /^ExpoPushToken\[[^\]]+\]$/.test(s); }; /** * Get unique, trimmed strings from an array. * @param arr * @returns {any[]} */ const uniqStrings = (arr) => [ ...new Set( arr .filter(Boolean) .map((x) => String(x).trim()) .filter(Boolean) ) ]; /** * Normalize users.fcmtokens (jsonb) into an array of Expo push tokens. * * New expected shape (example): * { * "ExponentPushToken[dksJAdLUTofdEk7P59thue]": { * "platform": "ios", * "timestamp": 1767397802709, * "pushTokenString": "ExponentPushToken[dksJAdLUTofdEk7P59thue]" * } * } * * Also supports older/alternate shapes: * - string: "ExponentPushToken[...]" * - array: ["ExponentPushToken[...]", ...] * - object: token keys OR values containing token-like fields * @param fcmtokens * @returns {string[]|*[]} */ const normalizeTokens = (fcmtokens) => { if (!fcmtokens) return []; if (typeof fcmtokens === "string") { const s = fcmtokens.trim(); return isExpoPushToken(s) ? [s] : []; } if (Array.isArray(fcmtokens)) { return uniqStrings(fcmtokens).filter(isExpoPushToken); } if (typeof fcmtokens === "object") { const keys = Object.keys(fcmtokens || {}); const vals = Object.values(fcmtokens || {}); const fromKeys = keys.filter(isExpoPushToken); const fromValues = vals .map((v) => { if (!v) return null; // Some shapes store token as a string value directly if (typeof v === "string") return v; if (typeof v === "object") { // Your new shape uses pushTokenString return v.pushTokenString || v.token || v.expoPushToken || null; } return null; }) .filter(Boolean) .map(String); return uniqStrings([...fromKeys, ...fromValues]).filter(isExpoPushToken); } return []; }; /** * Remove specified tokens from the stored fcmtokens jsonb while preserving the original shape. * @param fcmtokens * @param tokensToRemove * @returns {*} */ const removeTokensFromFcmtokens = (fcmtokens, tokensToRemove) => { const remove = new Set((tokensToRemove || []).map((t) => String(t).trim()).filter(Boolean)); if (!remove.size) return fcmtokens; if (!fcmtokens) return fcmtokens; if (typeof fcmtokens === "string") { const s = fcmtokens.trim(); return remove.has(s) ? null : fcmtokens; } if (Array.isArray(fcmtokens)) { const next = fcmtokens.filter((t) => !remove.has(String(t).trim())); return next.length ? next : []; } if (typeof fcmtokens === "object") { const next = {}; for (const [k, v] of Object.entries(fcmtokens)) { const keyIsToken = isExpoPushToken(k) && remove.has(k); let valueToken = null; if (typeof v === "string") valueToken = v; else if (v && typeof v === "object") valueToken = v.pushTokenString || v.token || v.expoPushToken || null; const valueIsToken = valueToken && remove.has(String(valueToken).trim()); if (keyIsToken || valueIsToken) continue; next[k] = v; } return Object.keys(next).length ? next : {}; } return fcmtokens; }; /** * Safely parse JSON response. * @param res * @returns {Promise<*|null>} */ const safeJson = async (res) => { try { return await res.json(); } catch { return null; } }; /** * Send Expo push notifications. * Returns invalid tokens that should be removed (e.g., DeviceNotRegistered). * * @param {Array} messages Expo messages array * @param {Object} logger * @returns {Promise<{invalidTokens: string[], ticketIds: string[]}>} */ const sendExpoPush = async ({ messages, logger }) => { if (!messages?.length) return { invalidTokens: [], ticketIds: [] }; const invalidTokens = new Set(); const ticketIds = []; for (const batch of chunk(messages, EXPO_MAX_MESSAGES_PER_REQUEST)) { const res = await fetch(EXPO_PUSH_ENDPOINT, { method: "POST", headers: { accept: "application/json", "content-type": "application/json" }, body: JSON.stringify(batch) }); const payload = await safeJson(res); if (!res.ok) { logger?.log?.("expo-push-http-error", "ERROR", "notifications", "api", { status: res.status, statusText: res.statusText, payload }); throw new Error(`Expo push HTTP error: ${res.status} ${res.statusText}`); } const tickets = Array.isArray(payload?.data) ? payload.data : payload?.data ? [payload.data] : []; if (!tickets.length) { logger?.log?.("expo-push-bad-response", "ERROR", "notifications", "api", { payload }); continue; } // Expo returns tickets in the same order as messages in the request batch for (let i = 0; i < tickets.length; i++) { const t = tickets[i]; const msg = batch[i]; const token = typeof msg?.to === "string" ? msg.to : null; if (t?.status === "ok" && t?.id) ticketIds.push(String(t.id)); if (t?.status === "error") { const errCode = t?.details?.error; const msgText = String(t?.message || ""); const shouldDelete = errCode === "DeviceNotRegistered" || /not a registered push notification recipient/i.test(msgText); if (shouldDelete && token && isExpoPushToken(token)) { invalidTokens.add(token); } logger?.log?.("expo-push-ticket-error", "ERROR", "notifications", "api", { token, ticket: t }); } } } return { invalidTokens: [...invalidTokens], ticketIds }; }; /** * Build a summary string for push notification body. * @param count * @param jobRoNumber * @param bodyShopName * @returns {`${string} ${string} for ${string|string}${string|string}`} */ const buildPushSummary = ({ count, jobRoNumber, bodyShopName }) => { const updates = count === 1 ? "update" : "updates"; const ro = jobRoNumber ? `RO ${jobRoNumber}` : "a job"; const shop = bodyShopName ? ` at ${bodyShopName}` : ""; return `${count} ${updates} for ${ro}${shop}`; }; /** * Loads the push notification queues and workers (Expo push). * @param pubClient * @param logger * @returns {Promise} */ const loadFcmQueue = async ({ pubClient, logger }) => { if (!fcmAddQueue || !fcmConsolidateQueue) { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; devDebugLogger(`Initializing Expo Push Queues with prefix: ${prefix}`); fcmAddQueue = new Queue("fcmAdd", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); fcmConsolidateQueue = new Queue("fcmConsolidate", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); fcmAddWorker = new Worker( "fcmAdd", async (job) => { const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = job.data; devDebugLogger(`Adding push notifications for jobId ${jobId}`); const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`; // set of user emails const metaKey = `fcm:${devKey}:meta:${jobId}`; const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; // per-user list keys // Store job-level metadata (always keep latest values) await pubClient.hset(metaKey, "jobRoNumber", jobRoNumber || ""); await pubClient.hset(metaKey, "bodyShopId", bodyShopId || ""); await pubClient.hset(metaKey, "bodyShopName", bodyShopName || ""); await pubClient.expire(metaKey, seconds(NOTIFICATION_EXPIRATION)); for (const r of recipients || []) { const user = r?.user; const associationId = r?.associationId; if (!user) continue; const userKey = `${redisKeyPrefix}:${user}`; const payload = JSON.stringify({ body: body || "", scenarioKey: scenarioKey || "", key: key || "", variables: variables || {}, associationId: associationId ? String(associationId) : null, ts: Date.now() }); await pubClient.rpush(userKey, payload); await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION)); await pubClient.sadd(recipientsSetKey, user); await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION)); } const consolidateKey = `fcm:${devKey}:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); if (flagSet) { await fcmConsolidateQueue.add( "consolidate-fcm", { jobId }, { jobId: `consolidate-${jobId}`, delay: FCM_CONSOLIDATION_DELAY, attempts: 3, backoff: BACKOFF_DELAY } ); await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION)); devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); } else { devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } }, { prefix, connection: pubClient, concurrency: 5 } ); fcmConsolidateWorker = new Worker( "fcmConsolidate", async (job) => { const { jobId } = job.data; const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; const lockKey = `lock:${devKey}:fcmConsolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); if (!lockAcquired) { devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); return; } try { const recipientsSet = `fcm:${devKey}:recipients:${jobId}`; const userEmails = await pubClient.smembers(recipientsSet); if (!userEmails?.length) { devDebugLogger(`No recipients found for jobId ${jobId}`); await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`); return; } // Load meta const metaKey = `fcm:${devKey}:meta:${jobId}`; const meta = await pubClient.hgetall(metaKey); const jobRoNumber = meta?.jobRoNumber || ""; const bodyShopId = meta?.bodyShopId || ""; const bodyShopName = meta?.bodyShopName || ""; // Fetch tokens for all recipients (1 DB round-trip) const usersResp = await gqlClient.request(GET_USERS_FCM_TOKENS_BY_EMAILS, { emails: userEmails }); // Map: email -> { raw, tokens } const tokenMap = new Map( (usersResp?.users || []).map((u) => [ String(u.email), { raw: u.fcmtokens, tokens: normalizeTokens(u.fcmtokens) } ]) ); for (const userEmail of userEmails) { const userKey = `fcm:${devKey}:notifications:${jobId}:${userEmail}`; const raw = await pubClient.lrange(userKey, 0, -1); if (!raw?.length) { await pubClient.del(userKey); continue; } const parsed = raw .map((x) => { try { return JSON.parse(x); } catch { return null; } }) .filter(Boolean); const count = parsed.length; const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName }); // associationId should be stable for a user in a job’s bodyshop; take first non-null const firstWithAssociation = parsed.find((p) => p?.associationId != null); const associationId = firstWithAssociation?.associationId != null ? String(firstWithAssociation.associationId) : ""; const tokenInfo = tokenMap.get(String(userEmail)) || { raw: null, tokens: [] }; const tokens = tokenInfo.tokens || []; if (!tokens.length) { devDebugLogger(`No Expo push tokens for ${userEmail}; skipping push for jobId ${jobId}`); await pubClient.del(userKey); continue; } // Build 1 message per device token const messages = tokens.map((token) => ({ to: token, title: "ImEX Online", body: notificationBody, priority: "high", data: { type: "job-notification", jobId: String(jobId), jobRoNumber: String(jobRoNumber || ""), bodyShopId: String(bodyShopId || ""), bodyShopName: String(bodyShopName || ""), associationId: String(associationId || ""), userEmail: String(userEmail), count: String(count) } })); const { invalidTokens } = await sendExpoPush({ messages, logger }); // Opportunistic cleanup: remove invalid tokens from users.fcmtokens if (invalidTokens?.length) { try { const nextFcmtokens = removeTokensFromFcmtokens(tokenInfo.raw, invalidTokens); await gqlClient.request(UPDATE_USER_FCM_TOKENS_BY_EMAIL, { email: String(userEmail), fcmtokens: nextFcmtokens }); devDebugLogger(`Cleaned ${invalidTokens.length} invalid Expo tokens for ${userEmail}`); } catch (e) { logger?.log?.("expo-push-token-cleanup-failed", "ERROR", "notifications", "api", { userEmail: String(userEmail), message: e?.message, stack: e?.stack }); // Do not throw: cleanup failure should not retry the whole consolidation and risk duplicate pushes. } } devDebugLogger(`Sent Expo push to ${userEmail} for jobId ${jobId} (${count} updates)`); await pubClient.del(userKey); } await pubClient.del(recipientsSet); await pubClient.del(metaKey); await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`); } catch (err) { logger.log("fcm-queue-consolidation-error", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }); throw err; } finally { await pubClient.del(lockKey); } }, { prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); fcmAddWorker.on("failed", (job, err) => logger.log("fcm-add-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); fcmConsolidateWorker.on("failed", (job, err) => logger.log("fcm-consolidate-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); const shutdown = async () => { devDebugLogger("Closing push queue workers..."); await Promise.all([fcmAddWorker.close(), fcmConsolidateWorker.close()]); devDebugLogger("Push queue workers closed"); }; registerCleanupTask(shutdown); } return fcmAddQueue; }; /** * Get the add queue. * @returns {*} */ const getQueue = () => { if (!fcmAddQueue) throw new Error("FCM add queue not initialized. Ensure loadFcmQueue is called during bootstrap."); return fcmAddQueue; }; /** * Dispatch push notifications to the add queue. * @param fcmsToDispatch * @returns {Promise} */ const dispatchFcmsToQueue = async ({ fcmsToDispatch }) => { const queue = getQueue(); for (const fcm of fcmsToDispatch) { const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = fcm; if (!jobId || !recipients?.length) continue; await queue.add( "add-fcm-notification", { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients }, { jobId: `${jobId}-${Date.now()}` } ); } }; module.exports = { loadFcmQueue, getQueue, dispatchFcmsToQueue };