463 lines
14 KiB
JavaScript
463 lines
14 KiB
JavaScript
// NOTE: Despite the filename, this implementation targets Expo Push Tokens (ExponentPushToken[...]).
|
||
// It does NOT use Firebase Admin and does NOT require credentials (no EXPO_ACCESS_TOKEN).
|
||
|
||
const { Queue, Worker } = require("bullmq");
|
||
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||
const devDebugLogger = require("../../utils/devDebugLogger");
|
||
|
||
const { client: gqlClient } = require("../../graphql-client/graphql-client");
|
||
const { GET_USERS_FCM_TOKENS_BY_EMAILS } = require("../../graphql-client/queries");
|
||
|
||
const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => {
|
||
const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS;
|
||
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
|
||
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue);
|
||
})();
|
||
|
||
const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000;
|
||
|
||
// pegged constants (pattern matches your other queues)
|
||
const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
|
||
const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 0.25;
|
||
const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1;
|
||
const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
|
||
|
||
const EXPO_PUSH_ENDPOINT = "https://exp.host/--/api/v2/push/send";
|
||
const EXPO_MAX_MESSAGES_PER_REQUEST = 100;
|
||
|
||
let fcmAddQueue;
|
||
let fcmConsolidateQueue;
|
||
let fcmAddWorker;
|
||
let fcmConsolidateWorker;
|
||
|
||
/**
|
||
* Milliseconds to seconds.
|
||
* @param ms
|
||
* @returns {number}
|
||
*/
|
||
const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000));
|
||
|
||
/**
|
||
* Chunk an array into smaller arrays of given size.
|
||
* @param arr
|
||
* @param size
|
||
* @returns {*[]}
|
||
*/
|
||
const chunk = (arr, size) => {
|
||
const out = [];
|
||
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size));
|
||
return out;
|
||
};
|
||
|
||
/**
|
||
* Check if a string is an Expo push token.
|
||
* @param s
|
||
* @returns {boolean}
|
||
*/
|
||
const isExpoPushToken = (s) => {
|
||
if (!s || typeof s !== "string") return false;
|
||
// Common formats observed in the wild:
|
||
// - ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]
|
||
// - ExpoPushToken[xxxxxxxxxxxxxxxxxxxxxx]
|
||
return /^ExponentPushToken\[[^\]]+\]$/.test(s) || /^ExpoPushToken\[[^\]]+\]$/.test(s);
|
||
};
|
||
|
||
/**
|
||
* Get unique, trimmed strings from an array.
|
||
* @param arr
|
||
* @returns {any[]}
|
||
*/
|
||
const uniqStrings = (arr) => [
|
||
...new Set(
|
||
arr
|
||
.filter(Boolean)
|
||
.map((x) => String(x).trim())
|
||
.filter(Boolean)
|
||
)
|
||
];
|
||
|
||
/**
|
||
* Normalize users.fcmtokens (jsonb) into an array of Expo push tokens.
|
||
*
|
||
* New expected shape (example):
|
||
* {
|
||
* "ExponentPushToken[dksJAdLUTofdEk7P59thue]": {
|
||
* "platform": "ios",
|
||
* "timestamp": 1767397802709,
|
||
* "pushTokenString": "ExponentPushToken[dksJAdLUTofdEk7P59thue]"
|
||
* }
|
||
* }
|
||
*
|
||
* Also supports older/alternate shapes:
|
||
* - string: "ExponentPushToken[...]"
|
||
* - array: ["ExponentPushToken[...]", ...]
|
||
* - object: token keys OR values containing token-like fields
|
||
* @param fcmtokens
|
||
* @returns {string[]|*[]}
|
||
*/
|
||
const normalizeTokens = (fcmtokens) => {
|
||
if (!fcmtokens) return [];
|
||
|
||
if (typeof fcmtokens === "string") {
|
||
const s = fcmtokens.trim();
|
||
return isExpoPushToken(s) ? [s] : [];
|
||
}
|
||
|
||
if (Array.isArray(fcmtokens)) {
|
||
return uniqStrings(fcmtokens).filter(isExpoPushToken);
|
||
}
|
||
|
||
if (typeof fcmtokens === "object") {
|
||
const keys = Object.keys(fcmtokens || {});
|
||
const vals = Object.values(fcmtokens || {});
|
||
|
||
const fromKeys = keys.filter(isExpoPushToken);
|
||
|
||
const fromValues = vals
|
||
.map((v) => {
|
||
if (!v) return null;
|
||
|
||
// Some shapes store token as a string value directly
|
||
if (typeof v === "string") return v;
|
||
|
||
if (typeof v === "object") {
|
||
// Your new shape uses pushTokenString
|
||
return v.pushTokenString || v.token || v.expoPushToken || null;
|
||
}
|
||
|
||
return null;
|
||
})
|
||
.filter(Boolean)
|
||
.map(String);
|
||
|
||
return uniqStrings([...fromKeys, ...fromValues]).filter(isExpoPushToken);
|
||
}
|
||
|
||
return [];
|
||
};
|
||
|
||
/**
|
||
* Safely parse JSON response.
|
||
* @param res
|
||
* @returns {Promise<*|null>}
|
||
*/
|
||
const safeJson = async (res) => {
|
||
try {
|
||
return await res.json();
|
||
} catch {
|
||
return null;
|
||
}
|
||
};
|
||
|
||
/**
|
||
* Send Expo push notifications
|
||
* @param {Array<Object>} messages Expo messages array
|
||
* @param {Object} logger
|
||
*/
|
||
const sendExpoPush = async ({ messages, logger }) => {
|
||
if (!messages?.length) return;
|
||
|
||
for (const batch of chunk(messages, EXPO_MAX_MESSAGES_PER_REQUEST)) {
|
||
const res = await fetch(EXPO_PUSH_ENDPOINT, {
|
||
method: "POST",
|
||
headers: {
|
||
accept: "application/json",
|
||
"content-type": "application/json"
|
||
},
|
||
body: JSON.stringify(batch)
|
||
});
|
||
|
||
const payload = await safeJson(res);
|
||
|
||
if (!res.ok) {
|
||
logger?.log?.("expo-push-http-error", "ERROR", "notifications", "api", {
|
||
status: res.status,
|
||
statusText: res.statusText,
|
||
payload
|
||
});
|
||
throw new Error(`Expo push HTTP error: ${res.status} ${res.statusText}`);
|
||
}
|
||
|
||
const data = payload?.data;
|
||
if (Array.isArray(data)) {
|
||
const errors = data.filter((t) => t?.status === "error");
|
||
if (errors.length) {
|
||
logger?.log?.("expo-push-ticket-errors", "ERROR", "notifications", "api", { errors });
|
||
}
|
||
}
|
||
}
|
||
};
|
||
/**
|
||
* Build a summary string for push notification body.
|
||
* @param count
|
||
* @param jobRoNumber
|
||
* @param bodyShopName
|
||
* @returns {`${string} ${string} for ${string|string}${string|string}`}
|
||
*/
|
||
const buildPushSummary = ({ count, jobRoNumber, bodyShopName }) => {
|
||
const updates = count === 1 ? "update" : "updates";
|
||
const ro = jobRoNumber ? `RO ${jobRoNumber}` : "a job";
|
||
const shop = bodyShopName ? ` at ${bodyShopName}` : "";
|
||
return `${count} ${updates} for ${ro}${shop}`;
|
||
};
|
||
|
||
/**
|
||
* Loads the push notification queues and workers (Expo push).
|
||
* @param pubClient
|
||
* @param logger
|
||
* @returns {Promise<Queue|null>}
|
||
*/
|
||
const loadFcmQueue = async ({ pubClient, logger }) => {
|
||
if (!fcmAddQueue || !fcmConsolidateQueue) {
|
||
const prefix = getBullMQPrefix();
|
||
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
|
||
|
||
devDebugLogger(`Initializing Expo Push Queues with prefix: ${prefix}`);
|
||
|
||
fcmAddQueue = new Queue("fcmAdd", {
|
||
prefix,
|
||
connection: pubClient,
|
||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||
});
|
||
|
||
fcmConsolidateQueue = new Queue("fcmConsolidate", {
|
||
prefix,
|
||
connection: pubClient,
|
||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||
});
|
||
|
||
fcmAddWorker = new Worker(
|
||
"fcmAdd",
|
||
async (job) => {
|
||
const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } =
|
||
job.data;
|
||
|
||
devDebugLogger(`Adding push notifications for jobId ${jobId}`);
|
||
|
||
const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`; // set of user emails
|
||
const metaKey = `fcm:${devKey}:meta:${jobId}`;
|
||
const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`; // per-user list keys
|
||
|
||
// store job-level metadata once
|
||
await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || "");
|
||
await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || "");
|
||
await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || "");
|
||
await pubClient.expire(metaKey, seconds(NOTIFICATION_EXPIRATION));
|
||
|
||
for (const r of recipients || []) {
|
||
const user = r?.user;
|
||
const associationId = r?.associationId;
|
||
|
||
if (!user) continue;
|
||
|
||
const userKey = `${redisKeyPrefix}:${user}`;
|
||
const payload = JSON.stringify({
|
||
body: body || "",
|
||
scenarioKey: scenarioKey || "",
|
||
key: key || "",
|
||
variables: variables || {},
|
||
associationId: associationId ? String(associationId) : null,
|
||
ts: Date.now()
|
||
});
|
||
|
||
await pubClient.rpush(userKey, payload);
|
||
await pubClient.expire(userKey, seconds(NOTIFICATION_EXPIRATION));
|
||
|
||
await pubClient.sadd(recipientsSetKey, user);
|
||
await pubClient.expire(recipientsSetKey, seconds(NOTIFICATION_EXPIRATION));
|
||
}
|
||
|
||
const consolidateKey = `fcm:${devKey}:consolidate:${jobId}`;
|
||
const flagSet = await pubClient.setnx(consolidateKey, "pending");
|
||
|
||
if (flagSet) {
|
||
await fcmConsolidateQueue.add(
|
||
"consolidate-fcm",
|
||
{ jobId },
|
||
{
|
||
jobId: `consolidate-${jobId}`,
|
||
delay: FCM_CONSOLIDATION_DELAY,
|
||
attempts: 3,
|
||
backoff: LOCK_EXPIRATION
|
||
}
|
||
);
|
||
|
||
await pubClient.expire(consolidateKey, seconds(CONSOLIDATION_KEY_EXPIRATION));
|
||
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
|
||
} else {
|
||
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
|
||
}
|
||
},
|
||
{ prefix, connection: pubClient, concurrency: 5 }
|
||
);
|
||
|
||
fcmConsolidateWorker = new Worker(
|
||
"fcmConsolidate",
|
||
async (job) => {
|
||
const { jobId } = job.data;
|
||
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
|
||
|
||
const lockKey = `lock:${devKey}:fcmConsolidate:${jobId}`;
|
||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION));
|
||
|
||
if (!lockAcquired) {
|
||
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
||
return;
|
||
}
|
||
|
||
try {
|
||
const recipientsSet = `fcm:${devKey}:recipients:${jobId}`;
|
||
const userEmails = await pubClient.smembers(recipientsSet);
|
||
|
||
if (!userEmails?.length) {
|
||
devDebugLogger(`No recipients found for jobId ${jobId}`);
|
||
await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`);
|
||
return;
|
||
}
|
||
|
||
// Load meta
|
||
const metaKey = `fcm:${devKey}:meta:${jobId}`;
|
||
const meta = await pubClient.hgetall(metaKey);
|
||
const jobRoNumber = meta?.jobRoNumber || "";
|
||
const bodyShopId = meta?.bodyShopId || "";
|
||
const bodyShopName = meta?.bodyShopName || "";
|
||
|
||
// Fetch tokens for all recipients (1 DB round-trip)
|
||
const usersResp = await gqlClient.request(GET_USERS_FCM_TOKENS_BY_EMAILS, { emails: userEmails });
|
||
const tokenMap = new Map(
|
||
(usersResp?.users || []).map((u) => [String(u.email), normalizeTokens(u.fcmtokens)])
|
||
);
|
||
|
||
for (const userEmail of userEmails) {
|
||
const userKey = `fcm:${devKey}:notifications:${jobId}:${userEmail}`;
|
||
const raw = await pubClient.lrange(userKey, 0, -1);
|
||
|
||
if (!raw?.length) {
|
||
await pubClient.del(userKey);
|
||
continue;
|
||
}
|
||
|
||
const parsed = raw
|
||
.map((x) => {
|
||
try {
|
||
return JSON.parse(x);
|
||
} catch {
|
||
return null;
|
||
}
|
||
})
|
||
.filter(Boolean);
|
||
|
||
const count = parsed.length;
|
||
const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName });
|
||
|
||
// associationId should be stable for a user in a job’s bodyshop; take first non-null
|
||
const associationId =
|
||
parsed.find((p) => p?.associationId)?.associationId != null
|
||
? String(parsed.find((p) => p?.associationId)?.associationId)
|
||
: "";
|
||
|
||
const tokens = tokenMap.get(String(userEmail)) || [];
|
||
|
||
if (!tokens.length) {
|
||
devDebugLogger(`No Expo push tokens for ${userEmail}; skipping push for jobId ${jobId}`);
|
||
await pubClient.del(userKey);
|
||
continue;
|
||
}
|
||
|
||
// Build 1 message per device token
|
||
const messages = tokens.map((token) => ({
|
||
to: token,
|
||
title: "ImEX Online",
|
||
body: notificationBody,
|
||
priority: "high",
|
||
data: {
|
||
type: "job-notification",
|
||
jobId: String(jobId),
|
||
jobRoNumber: String(jobRoNumber || ""),
|
||
bodyShopId: String(bodyShopId || ""),
|
||
bodyShopName: String(bodyShopName || ""),
|
||
associationId: String(associationId || ""),
|
||
userEmail: String(userEmail),
|
||
count: String(count)
|
||
}
|
||
}));
|
||
|
||
await sendExpoPush({ messages, logger });
|
||
|
||
devDebugLogger(`Sent Expo push to ${userEmail} for jobId ${jobId} (${count} updates)`);
|
||
|
||
await pubClient.del(userKey);
|
||
}
|
||
|
||
await pubClient.del(recipientsSet);
|
||
await pubClient.del(metaKey);
|
||
await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`);
|
||
} catch (err) {
|
||
logger.log("fcm-queue-consolidation-error", "ERROR", "notifications", "api", {
|
||
message: err?.message,
|
||
stack: err?.stack
|
||
});
|
||
throw err;
|
||
} finally {
|
||
await pubClient.del(lockKey);
|
||
}
|
||
},
|
||
{ prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } }
|
||
);
|
||
|
||
fcmAddWorker.on("failed", (job, err) =>
|
||
logger.log("fcm-add-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack })
|
||
);
|
||
|
||
fcmConsolidateWorker.on("failed", (job, err) =>
|
||
logger.log("fcm-consolidate-failed", "ERROR", "notifications", "api", {
|
||
message: err?.message,
|
||
stack: err?.stack
|
||
})
|
||
);
|
||
|
||
const shutdown = async () => {
|
||
devDebugLogger("Closing push queue workers...");
|
||
await Promise.all([fcmAddWorker.close(), fcmConsolidateWorker.close()]);
|
||
devDebugLogger("Push queue workers closed");
|
||
};
|
||
|
||
registerCleanupTask(shutdown);
|
||
}
|
||
|
||
return fcmAddQueue;
|
||
};
|
||
|
||
/**
|
||
* Get the add queue.
|
||
* @returns {*}
|
||
*/
|
||
const getQueue = () => {
|
||
if (!fcmAddQueue) throw new Error("FCM add queue not initialized. Ensure loadFcmQueue is called during bootstrap.");
|
||
return fcmAddQueue;
|
||
};
|
||
|
||
/**
|
||
* Dispatch push notifications to the add queue.
|
||
* @param fcmsToDispatch
|
||
* @returns {Promise<void>}
|
||
*/
|
||
const dispatchFcmsToQueue = async ({ fcmsToDispatch }) => {
|
||
const queue = getQueue();
|
||
|
||
for (const fcm of fcmsToDispatch) {
|
||
const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = fcm;
|
||
|
||
if (!jobId || !recipients?.length) continue;
|
||
|
||
await queue.add(
|
||
"add-fcm-notification",
|
||
{ jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients },
|
||
{ jobId: `${jobId}-${Date.now()}` }
|
||
);
|
||
}
|
||
};
|
||
|
||
module.exports = { loadFcmQueue, getQueue, dispatchFcmsToQueue };
|