feature/IO-3492-FCM-Queue-For-Notifications: Finalize

This commit is contained in:
Dave
2026-01-05 18:42:40 -05:00
parent 4cdc15f70b
commit 00bf5977ae
5 changed files with 170 additions and 35 deletions

View File

@@ -3196,3 +3196,11 @@ exports.GET_USERS_FCM_TOKENS_BY_EMAILS = /* GraphQL */ `
}
}
`;
exports.UPDATE_USER_FCM_TOKENS_BY_EMAIL = /* GraphQL */ `
mutation UPDATE_USER_FCM_TOKENS_BY_EMAIL($email: String!, $fcmtokens: jsonb) {
update_users(where: { email: { _eq: $email } }, _set: { fcmtokens: $fcmtokens }) {
affected_rows
}
}
`;

View File

@@ -168,6 +168,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
// Collect notifications by recipientKey
const notificationsByRecipient = new Map(); // rk => parsed notifications array
const listKeysToDelete = []; // delete only after successful insert+emit
for (const rk of recipientKeys) {
const [user, bodyShopId] = rk.split(":");
@@ -188,14 +189,17 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (parsed.length) {
notificationsByRecipient.set(rk, parsed);
}
// Cleanup list key after reading
await pubClient.del(lk);
// IMPORTANT: do NOT delete list yet; only delete after successful insert+emit
listKeysToDelete.push(lk);
}
}
if (!notificationsByRecipient.size) {
devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`);
if (listKeysToDelete.length) {
await pubClient.del(...listKeysToDelete);
}
await pubClient.del(rkSet);
await pubClient.del(assocHash);
await pubClient.del(consolidateFlagKey(jobId));

View File

@@ -27,6 +27,16 @@ let emailConsolidateQueue;
let emailAddWorker;
let emailConsolidateWorker;
const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000));
const escapeHtml = (s = "") =>
String(s)
.replace(/&/g, "&")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&#39;");
/**
* Initializes the email notification queues and workers.
*
@@ -65,19 +75,21 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`;
for (const recipient of recipients) {
for (const recipient of recipients || []) {
const { user, firstName, lastName } = recipient;
if (!user) continue;
const userKey = `${redisKeyPrefix}:${user}`;
await pubClient.rpush(userKey, body);
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION));
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`;
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone);
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
const tzValue = bodyShopTimezone || "UTC";
await pubClient.hsetnx(detailsKey, "bodyShopTimezone", tzValue);
await pubClient.expire(detailsKey, seconds(NOTIFICATION_EXPIRATION));
const recipientsSetKey = `email:${devKey}:recipients:${jobId}`;
await pubClient.sadd(recipientsSetKey, user);
await pubClient.expire(recipientsSetKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION));
devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`);
}
@@ -95,7 +107,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
}
);
devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION));
} else {
devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`);
}
@@ -115,7 +127,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
devDebugLogger(`Consolidating emails for jobId ${jobId}`);
const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION));
if (lockAcquired) {
try {
const recipientsSet = `email:${devKey}:recipients:${jobId}`;
@@ -141,7 +153,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
<table class="row" style="border-spacing: 0; border-collapse: collapse; padding-top: 0; padding-right: 0; padding-bottom: 0; padding-left: 0; vertical-align: top; text-align: left; font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; padding: 0; width: 100%; position: relative; display: table;"><tbody style="font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; display: table-row-group;"><tr style="padding-top: 0; padding-right: 0; padding-bottom: 0; padding-left: 0; vertical-align: top; text-align: left; font-family: 'Montserrat', 'Montserrat Alternates', sans-serif;">
<th class="small-12 large-12 columns first last" style="word-wrap: break-word; -webkit-hyphens: auto; -moz-hyphens: auto; hyphens: auto; vertical-align: top; color: #0a0a0a; font-weight: normal; padding-top: 0; text-align: left; font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; font-size: 15px; line-height: 1.2; margin: 0 auto; Margin: 0 auto; padding-bottom: 16px; width: 734px; padding-left: 8px; padding-right: 8px; border-collapse: collapse;"><table style="border-spacing: 0; border-collapse: collapse; padding-top: 0; padding-right: 0; padding-bottom: 0; padding-left: 0; vertical-align: top; text-align: left; font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; width: 100%;"><tr style="padding-top: 0; padding-right: 0; padding-bottom: 0; padding-left: 0; vertical-align: top; text-align: left; font-family: 'Montserrat', 'Montserrat Alternates', sans-serif;"><td style="word-wrap: break-word; vertical-align: top; color: #0a0a0a; font-weight: normal; padding-top: 0; padding-right: 0; padding-bottom: 0; padding-left: 0; margin: 0; Margin: 0; text-align: left; font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; font-size: 15px; word-break: keep-all; -moz-hyphens: none; -ms-hyphens: none; -webkit-hyphens: none; hyphens: none; line-height: 1.2; border-collapse: collapse;">
<ul style="font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; margin: 1%; padding-left: 30px;">
${messages.map((msg) => `<li style="font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; font-size: 90%;">${msg}</li>`).join("")}
${messages.map((msg) => `<li style="font-family: 'Montserrat', 'Montserrat Alternates', sans-serif; font-size: 90%;">${escapeHtml(msg)}</li>`).join("")}
</ul>
</td></tr></table></th>
</tr><tbody></table>

View File

@@ -7,7 +7,7 @@ const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const { client: gqlClient } = require("../../graphql-client/graphql-client");
const { GET_USERS_FCM_TOKENS_BY_EMAILS } = require("../../graphql-client/queries");
const { GET_USERS_FCM_TOKENS_BY_EMAILS, UPDATE_USER_FCM_TOKENS_BY_EMAIL } = require("../../graphql-client/queries");
const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS;
@@ -19,7 +19,13 @@ const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000;
// pegged constants (pattern matches your other queues)
const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 0.25;
// IMPORTANT: lock must outlive a full consolidation run to avoid duplicate sends.
const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
// Keep Bull backoff separate from lock TTL to avoid unexpected long retries.
const BACKOFF_DELAY = Math.max(1000, Math.floor(FCM_CONSOLIDATION_DELAY * 0.25));
const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1;
const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
@@ -137,6 +143,47 @@ const normalizeTokens = (fcmtokens) => {
return [];
};
/**
* Remove specified tokens from the stored fcmtokens jsonb while preserving the original shape.
* @param fcmtokens
* @param tokensToRemove
* @returns {*}
*/
const removeTokensFromFcmtokens = (fcmtokens, tokensToRemove) => {
const remove = new Set((tokensToRemove || []).map((t) => String(t).trim()).filter(Boolean));
if (!remove.size) return fcmtokens;
if (!fcmtokens) return fcmtokens;
if (typeof fcmtokens === "string") {
const s = fcmtokens.trim();
return remove.has(s) ? null : fcmtokens;
}
if (Array.isArray(fcmtokens)) {
const next = fcmtokens.filter((t) => !remove.has(String(t).trim()));
return next.length ? next : [];
}
if (typeof fcmtokens === "object") {
const next = {};
for (const [k, v] of Object.entries(fcmtokens)) {
const keyIsToken = isExpoPushToken(k) && remove.has(k);
let valueToken = null;
if (typeof v === "string") valueToken = v;
else if (v && typeof v === "object") valueToken = v.pushTokenString || v.token || v.expoPushToken || null;
const valueIsToken = valueToken && remove.has(String(valueToken).trim());
if (keyIsToken || valueIsToken) continue;
next[k] = v;
}
return Object.keys(next).length ? next : {};
}
return fcmtokens;
};
/**
* Safely parse JSON response.
* @param res
@@ -151,12 +198,18 @@ const safeJson = async (res) => {
};
/**
* Send Expo push notifications
* Send Expo push notifications.
* Returns invalid tokens that should be removed (e.g., DeviceNotRegistered).
*
* @param {Array<Object>} messages Expo messages array
* @param {Object} logger
* @returns {Promise<{invalidTokens: string[], ticketIds: string[]}>}
*/
const sendExpoPush = async ({ messages, logger }) => {
if (!messages?.length) return;
if (!messages?.length) return { invalidTokens: [], ticketIds: [] };
const invalidTokens = new Set();
const ticketIds = [];
for (const batch of chunk(messages, EXPO_MAX_MESSAGES_PER_REQUEST)) {
const res = await fetch(EXPO_PUSH_ENDPOINT, {
@@ -179,15 +232,43 @@ const sendExpoPush = async ({ messages, logger }) => {
throw new Error(`Expo push HTTP error: ${res.status} ${res.statusText}`);
}
const data = payload?.data;
if (Array.isArray(data)) {
const errors = data.filter((t) => t?.status === "error");
if (errors.length) {
logger?.log?.("expo-push-ticket-errors", "ERROR", "notifications", "api", { errors });
const tickets = Array.isArray(payload?.data) ? payload.data : payload?.data ? [payload.data] : [];
if (!tickets.length) {
logger?.log?.("expo-push-bad-response", "ERROR", "notifications", "api", { payload });
continue;
}
// Expo returns tickets in the same order as messages in the request batch
for (let i = 0; i < tickets.length; i++) {
const t = tickets[i];
const msg = batch[i];
const token = typeof msg?.to === "string" ? msg.to : null;
if (t?.status === "ok" && t?.id) ticketIds.push(String(t.id));
if (t?.status === "error") {
const errCode = t?.details?.error;
const msgText = String(t?.message || "");
const shouldDelete =
errCode === "DeviceNotRegistered" || /not a registered push notification recipient/i.test(msgText);
if (shouldDelete && token && isExpoPushToken(token)) {
invalidTokens.add(token);
}
logger?.log?.("expo-push-ticket-error", "ERROR", "notifications", "api", {
token,
ticket: t
});
}
}
}
return { invalidTokens: [...invalidTokens], ticketIds };
};
/**
* Build a summary string for push notification body.
* @param count
@@ -239,10 +320,10 @@ const loadFcmQueue = async ({ pubClient, logger }) => {
const metaKey = `fcm:${devKey}:meta:${jobId}`;
const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; // per-user list keys
// store job-level metadata once
await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || "");
await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || "");
await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || "");
// Store job-level metadata (always keep latest values)
await pubClient.hset(metaKey, "jobRoNumber", jobRoNumber || "");
await pubClient.hset(metaKey, "bodyShopId", bodyShopId || "");
await pubClient.hset(metaKey, "bodyShopName", bodyShopName || "");
await pubClient.expire(metaKey, seconds(NOTIFICATION_EXPIRATION));
for (const r of recipients || []) {
@@ -279,7 +360,7 @@ const loadFcmQueue = async ({ pubClient, logger }) => {
jobId: `consolidate-${jobId}`,
delay: FCM_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
backoff: BACKOFF_DELAY
}
);
@@ -325,8 +406,13 @@ const loadFcmQueue = async ({ pubClient, logger }) => {
// Fetch tokens for all recipients (1 DB round-trip)
const usersResp = await gqlClient.request(GET_USERS_FCM_TOKENS_BY_EMAILS, { emails: userEmails });
// Map: email -> { raw, tokens }
const tokenMap = new Map(
(usersResp?.users || []).map((u) => [String(u.email), normalizeTokens(u.fcmtokens)])
(usersResp?.users || []).map((u) => [
String(u.email),
{ raw: u.fcmtokens, tokens: normalizeTokens(u.fcmtokens) }
])
);
for (const userEmail of userEmails) {
@@ -352,12 +438,12 @@ const loadFcmQueue = async ({ pubClient, logger }) => {
const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName });
// associationId should be stable for a user in a jobs bodyshop; take first non-null
const firstWithAssociation = parsed.find((p) => p?.associationId != null);
const associationId =
parsed.find((p) => p?.associationId)?.associationId != null
? String(parsed.find((p) => p?.associationId)?.associationId)
: "";
firstWithAssociation?.associationId != null ? String(firstWithAssociation.associationId) : "";
const tokens = tokenMap.get(String(userEmail)) || [];
const tokenInfo = tokenMap.get(String(userEmail)) || { raw: null, tokens: [] };
const tokens = tokenInfo.tokens || [];
if (!tokens.length) {
devDebugLogger(`No Expo push tokens for ${userEmail}; skipping push for jobId ${jobId}`);
@@ -383,7 +469,28 @@ const loadFcmQueue = async ({ pubClient, logger }) => {
}
}));
await sendExpoPush({ messages, logger });
const { invalidTokens } = await sendExpoPush({ messages, logger });
// Opportunistic cleanup: remove invalid tokens from users.fcmtokens
if (invalidTokens?.length) {
try {
const nextFcmtokens = removeTokensFromFcmtokens(tokenInfo.raw, invalidTokens);
await gqlClient.request(UPDATE_USER_FCM_TOKENS_BY_EMAIL, {
email: String(userEmail),
fcmtokens: nextFcmtokens
});
devDebugLogger(`Cleaned ${invalidTokens.length} invalid Expo tokens for ${userEmail}`);
} catch (e) {
logger?.log?.("expo-push-token-cleanup-failed", "ERROR", "notifications", "api", {
userEmail: String(userEmail),
message: e?.message,
stack: e?.stack
});
// Do not throw: cleanup failure should not retry the whole consolidation and risk duplicate pushes.
}
}
devDebugLogger(`Sent Expo push to ${userEmail} for jobId ${jobId} (${count} updates)`);