diff --git a/client/src/components/notification-settings/notification-settings-form.component.jsx b/client/src/components/notification-settings/notification-settings-form.component.jsx index 6f29c9bb5..649a0be55 100644 --- a/client/src/components/notification-settings/notification-settings-form.component.jsx +++ b/client/src/components/notification-settings/notification-settings-form.component.jsx @@ -155,8 +155,12 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { ) - }, - { + } + ]; + + // Currently disabled for prod + if (!import.meta.env.PROD) { + columns.push({ title: setIsDirty(true)} />, dataIndex: "fcm", key: "fcm", @@ -166,8 +170,8 @@ const NotificationSettingsForm = ({ currentUser, bodyshop }) => { ) - } - ]; + }); + } const dataSource = notificationScenarios.map((scenario) => ({ key: scenario })); diff --git a/server/graphql-client/queries.js b/server/graphql-client/queries.js index 451ec6bed..c686d50eb 100644 --- a/server/graphql-client/queries.js +++ b/server/graphql-client/queries.js @@ -3196,3 +3196,11 @@ exports.GET_USERS_FCM_TOKENS_BY_EMAILS = /* GraphQL */ ` } } `; + +exports.UPDATE_USER_FCM_TOKENS_BY_EMAIL = /* GraphQL */ ` + mutation UPDATE_USER_FCM_TOKENS_BY_EMAIL($email: String!, $fcmtokens: jsonb) { + update_users(where: { email: { _eq: $email } }, _set: { fcmtokens: $fcmtokens }) { + affected_rows + } + } +`; diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index 9009ad8ed..4cd7777d6 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -168,6 +168,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { // Collect notifications by recipientKey const notificationsByRecipient = new Map(); // rk => parsed notifications array + const listKeysToDelete = []; // delete only after successful insert+emit for (const rk of recipientKeys) { const [user, bodyShopId] = rk.split(":"); @@ -188,14 +189,17 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (parsed.length) { notificationsByRecipient.set(rk, parsed); - } - // Cleanup list key after reading - await pubClient.del(lk); + // IMPORTANT: do NOT delete list yet; only delete after successful insert+emit + listKeysToDelete.push(lk); + } } if (!notificationsByRecipient.size) { devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`); + if (listKeysToDelete.length) { + await pubClient.del(...listKeysToDelete); + } await pubClient.del(rkSet); await pubClient.del(assocHash); await pubClient.del(consolidateFlagKey(jobId)); diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index 76d368c9f..a38765450 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -27,6 +27,16 @@ let emailConsolidateQueue; let emailAddWorker; let emailConsolidateWorker; +const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000)); + +const escapeHtml = (s = "") => + String(s) + .replace(/&/g, "&") + .replace(//g, ">") + .replace(/"/g, """) + .replace(/'/g, "'"); + /** * Initializes the email notification queues and workers. * @@ -65,19 +75,21 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`; - for (const recipient of recipients) { + for (const recipient of recipients || []) { const { user, firstName, lastName } = recipient; + if (!user) continue; const userKey = `${redisKeyPrefix}:${user}`; await pubClient.rpush(userKey, body); - await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); + await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION)); const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`; await pubClient.hsetnx(detailsKey, "firstName", firstName || ""); await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); - await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone); - await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); + const tzValue = bodyShopTimezone || "UTC"; + await pubClient.hsetnx(detailsKey, "bodyShopTimezone", tzValue); + await pubClient.expire(detailsKey, seconds(NOTIFICATION_EXPIRATION)); const recipientsSetKey = `email:${devKey}:recipients:${jobId}`; await pubClient.sadd(recipientsSetKey, user); - await pubClient.expire(recipientsSetKey, NOTIFICATION_EXPIRATION / 1000); + await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION)); devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`); } @@ -95,7 +107,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { } ); devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`); - await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); + await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION)); } else { devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`); } @@ -115,7 +127,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { devDebugLogger(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`; - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION)); if (lockAcquired) { try { const recipientsSet = `email:${devKey}:recipients:${jobId}`; @@ -141,7 +153,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { - ${messages.map((msg) => `${msg}`).join("")} + ${messages.map((msg) => `${escapeHtml(msg)}`).join("")} diff --git a/server/notifications/queues/fcmQueue.js b/server/notifications/queues/fcmQueue.js index e0c7f0776..4bbd34cac 100644 --- a/server/notifications/queues/fcmQueue.js +++ b/server/notifications/queues/fcmQueue.js @@ -7,7 +7,7 @@ const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); const { client: gqlClient } = require("../../graphql-client/graphql-client"); -const { GET_USERS_FCM_TOKENS_BY_EMAILS } = require("../../graphql-client/queries"); +const { GET_USERS_FCM_TOKENS_BY_EMAILS, UPDATE_USER_FCM_TOKENS_BY_EMAIL } = require("../../graphql-client/queries"); const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS; @@ -19,7 +19,13 @@ const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000; // pegged constants (pattern matches your other queues) const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; -const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 0.25; + +// IMPORTANT: lock must outlive a full consolidation run to avoid duplicate sends. +const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; + +// Keep Bull backoff separate from lock TTL to avoid unexpected long retries. +const BACKOFF_DELAY = Math.max(1000, Math.floor(FCM_CONSOLIDATION_DELAY * 0.25)); + const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1; const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5; @@ -137,6 +143,47 @@ const normalizeTokens = (fcmtokens) => { return []; }; +/** + * Remove specified tokens from the stored fcmtokens jsonb while preserving the original shape. + * @param fcmtokens + * @param tokensToRemove + * @returns {*} + */ +const removeTokensFromFcmtokens = (fcmtokens, tokensToRemove) => { + const remove = new Set((tokensToRemove || []).map((t) => String(t).trim()).filter(Boolean)); + if (!remove.size) return fcmtokens; + if (!fcmtokens) return fcmtokens; + + if (typeof fcmtokens === "string") { + const s = fcmtokens.trim(); + return remove.has(s) ? null : fcmtokens; + } + + if (Array.isArray(fcmtokens)) { + const next = fcmtokens.filter((t) => !remove.has(String(t).trim())); + return next.length ? next : []; + } + + if (typeof fcmtokens === "object") { + const next = {}; + for (const [k, v] of Object.entries(fcmtokens)) { + const keyIsToken = isExpoPushToken(k) && remove.has(k); + + let valueToken = null; + if (typeof v === "string") valueToken = v; + else if (v && typeof v === "object") valueToken = v.pushTokenString || v.token || v.expoPushToken || null; + + const valueIsToken = valueToken && remove.has(String(valueToken).trim()); + + if (keyIsToken || valueIsToken) continue; + next[k] = v; + } + return Object.keys(next).length ? next : {}; + } + + return fcmtokens; +}; + /** * Safely parse JSON response. * @param res @@ -151,12 +198,18 @@ const safeJson = async (res) => { }; /** - * Send Expo push notifications + * Send Expo push notifications. + * Returns invalid tokens that should be removed (e.g., DeviceNotRegistered). + * * @param {Array} messages Expo messages array * @param {Object} logger + * @returns {Promise<{invalidTokens: string[], ticketIds: string[]}>} */ const sendExpoPush = async ({ messages, logger }) => { - if (!messages?.length) return; + if (!messages?.length) return { invalidTokens: [], ticketIds: [] }; + + const invalidTokens = new Set(); + const ticketIds = []; for (const batch of chunk(messages, EXPO_MAX_MESSAGES_PER_REQUEST)) { const res = await fetch(EXPO_PUSH_ENDPOINT, { @@ -179,15 +232,43 @@ const sendExpoPush = async ({ messages, logger }) => { throw new Error(`Expo push HTTP error: ${res.status} ${res.statusText}`); } - const data = payload?.data; - if (Array.isArray(data)) { - const errors = data.filter((t) => t?.status === "error"); - if (errors.length) { - logger?.log?.("expo-push-ticket-errors", "ERROR", "notifications", "api", { errors }); + const tickets = Array.isArray(payload?.data) ? payload.data : payload?.data ? [payload.data] : []; + + if (!tickets.length) { + logger?.log?.("expo-push-bad-response", "ERROR", "notifications", "api", { payload }); + continue; + } + + // Expo returns tickets in the same order as messages in the request batch + for (let i = 0; i < tickets.length; i++) { + const t = tickets[i]; + const msg = batch[i]; + const token = typeof msg?.to === "string" ? msg.to : null; + + if (t?.status === "ok" && t?.id) ticketIds.push(String(t.id)); + + if (t?.status === "error") { + const errCode = t?.details?.error; + const msgText = String(t?.message || ""); + + const shouldDelete = + errCode === "DeviceNotRegistered" || /not a registered push notification recipient/i.test(msgText); + + if (shouldDelete && token && isExpoPushToken(token)) { + invalidTokens.add(token); + } + + logger?.log?.("expo-push-ticket-error", "ERROR", "notifications", "api", { + token, + ticket: t + }); } } } + + return { invalidTokens: [...invalidTokens], ticketIds }; }; + /** * Build a summary string for push notification body. * @param count @@ -239,10 +320,10 @@ const loadFcmQueue = async ({ pubClient, logger }) => { const metaKey = `fcm:${devKey}:meta:${jobId}`; const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; // per-user list keys - // store job-level metadata once - await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || ""); - await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || ""); - await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || ""); + // Store job-level metadata (always keep latest values) + await pubClient.hset(metaKey, "jobRoNumber", jobRoNumber || ""); + await pubClient.hset(metaKey, "bodyShopId", bodyShopId || ""); + await pubClient.hset(metaKey, "bodyShopName", bodyShopName || ""); await pubClient.expire(metaKey, seconds(NOTIFICATION_EXPIRATION)); for (const r of recipients || []) { @@ -279,7 +360,7 @@ const loadFcmQueue = async ({ pubClient, logger }) => { jobId: `consolidate-${jobId}`, delay: FCM_CONSOLIDATION_DELAY, attempts: 3, - backoff: LOCK_EXPIRATION + backoff: BACKOFF_DELAY } ); @@ -325,8 +406,13 @@ const loadFcmQueue = async ({ pubClient, logger }) => { // Fetch tokens for all recipients (1 DB round-trip) const usersResp = await gqlClient.request(GET_USERS_FCM_TOKENS_BY_EMAILS, { emails: userEmails }); + + // Map: email -> { raw, tokens } const tokenMap = new Map( - (usersResp?.users || []).map((u) => [String(u.email), normalizeTokens(u.fcmtokens)]) + (usersResp?.users || []).map((u) => [ + String(u.email), + { raw: u.fcmtokens, tokens: normalizeTokens(u.fcmtokens) } + ]) ); for (const userEmail of userEmails) { @@ -352,12 +438,12 @@ const loadFcmQueue = async ({ pubClient, logger }) => { const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName }); // associationId should be stable for a user in a job’s bodyshop; take first non-null + const firstWithAssociation = parsed.find((p) => p?.associationId != null); const associationId = - parsed.find((p) => p?.associationId)?.associationId != null - ? String(parsed.find((p) => p?.associationId)?.associationId) - : ""; + firstWithAssociation?.associationId != null ? String(firstWithAssociation.associationId) : ""; - const tokens = tokenMap.get(String(userEmail)) || []; + const tokenInfo = tokenMap.get(String(userEmail)) || { raw: null, tokens: [] }; + const tokens = tokenInfo.tokens || []; if (!tokens.length) { devDebugLogger(`No Expo push tokens for ${userEmail}; skipping push for jobId ${jobId}`); @@ -383,7 +469,28 @@ const loadFcmQueue = async ({ pubClient, logger }) => { } })); - await sendExpoPush({ messages, logger }); + const { invalidTokens } = await sendExpoPush({ messages, logger }); + + // Opportunistic cleanup: remove invalid tokens from users.fcmtokens + if (invalidTokens?.length) { + try { + const nextFcmtokens = removeTokensFromFcmtokens(tokenInfo.raw, invalidTokens); + + await gqlClient.request(UPDATE_USER_FCM_TOKENS_BY_EMAIL, { + email: String(userEmail), + fcmtokens: nextFcmtokens + }); + + devDebugLogger(`Cleaned ${invalidTokens.length} invalid Expo tokens for ${userEmail}`); + } catch (e) { + logger?.log?.("expo-push-token-cleanup-failed", "ERROR", "notifications", "api", { + userEmail: String(userEmail), + message: e?.message, + stack: e?.stack + }); + // Do not throw: cleanup failure should not retry the whole consolidation and risk duplicate pushes. + } + } devDebugLogger(`Sent Expo push to ${userEmail} for jobId ${jobId} (${count} updates)`);