Files
bodyshop/server/notifications/queues/fcmQueue.js

287 lines
11 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const { Queue, Worker } = require("bullmq");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const FCM_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.FCM_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue);
})();
const FCM_CONSOLIDATION_DELAY = FCM_CONSOLIDATION_DELAY_IN_MINS * 60000;
// pegged constants (pattern matches your other queues)
const CONSOLIDATION_KEY_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
const LOCK_EXPIRATION = FCM_CONSOLIDATION_DELAY * 0.25;
const RATE_LIMITER_DURATION = FCM_CONSOLIDATION_DELAY * 0.1;
const NOTIFICATION_EXPIRATION = FCM_CONSOLIDATION_DELAY * 1.5;
let fcmAddQueue;
let fcmConsolidateQueue;
let fcmAddWorker;
let fcmConsolidateWorker;
// IMPORTANT: do NOT require firebase-handler at module load time.
// firebase-handler does `require(process.env.FIREBASE_ADMINSDK_JSON)` at top-level,
// which will hard-crash environments that dont have Firebase configured.
const hasFirebaseEnv = () => Boolean(process.env.FIREBASE_ADMINSDK_JSON && process.env.FIREBASE_DATABASE_URL);
/**
* Get the Firebase Admin SDK, or null if Firebase is not configured.
* @returns {{app: app, remoteConfig: ((app?: App) => remoteConfig.RemoteConfig) | remoteConfig, firestore: ((app?: App) => FirebaseFirestore.Firestore) | firestore, AppOptions: AppOptions, auth: ((app?: App) => auth.Auth) | auth, securityRules: ((app?: App) => securityRules.SecurityRules) | securityRules, installations: ((app?: App) => installations.Installations) | installations, FirebaseArrayIndexError: FirebaseArrayIndexError, storage: ((app?: App) => storage.Storage) | storage, appCheck: ((app?: App) => appCheck.AppCheck) | appCheck, initializeApp(options?: AppOptions, name?: string): app.App, FirebaseError: FirebaseError, messaging: ((app?: App) => messaging.Messaging) | messaging, projectManagement: ((app?: App) => projectManagement.ProjectManagement) | projectManagement, database: ((app?: App) => database.Database) | database, machineLearning: ((app?: App) => machineLearning.MachineLearning) | machineLearning, instanceId: ((app?: App) => instanceId.InstanceId) | instanceId, SDK_VERSION: string, apps: (app.App | null)[], credential: credential, ServiceAccount: ServiceAccount, GoogleOAuthAccessToken: GoogleOAuthAccessToken}|null}
*/
const getFirebaseAdmin = () => {
if (!hasFirebaseEnv()) return null;
const { admin } = require("../../firebase/firebase-handler");
return admin;
};
/**
* Get the FCM topic name for an association.
* @param associationId
* @returns {`assoc-${string}-notifications`}
*/
const topicForAssociation = (associationId) => `assoc-${associationId}-notifications`;
/**
* Build a summary string for FCM push notification body.
* @param count
* @param jobRoNumber
* @param bodyShopName
* @returns {`${string} ${string} for ${string|string}${string|string}`}
*/
const buildPushSummary = ({ count, jobRoNumber, bodyShopName }) => {
const updates = count === 1 ? "update" : "updates";
const ro = jobRoNumber ? `RO ${jobRoNumber}` : "a job";
const shop = bodyShopName ? ` at ${bodyShopName}` : "";
return `${count} ${updates} for ${ro}${shop}`;
};
/**
* Loads the FCM notification queues and workers.
* @param pubClient
* @param logger
* @returns {Promise<Queue<any, any, string, ExtractDataType<any, any>, ExtractResultType<any, any>, ExtractNameType<any, string>>|null>}
*/
const loadFcmQueue = async ({ pubClient, logger }) => {
if (!hasFirebaseEnv()) {
devDebugLogger("FCM queue not initialized (Firebase env not configured).");
return null;
}
if (!fcmAddQueue || !fcmConsolidateQueue) {
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
devDebugLogger(`Initializing FCM Queues with prefix: ${prefix}`);
fcmAddQueue = new Queue("fcmAdd", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
fcmConsolidateQueue = new Queue("fcmConsolidate", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
fcmAddWorker = new Worker(
"fcmAdd",
async (job) => {
const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } =
job.data;
devDebugLogger(`Adding FCM notifications for jobId ${jobId}`);
const redisKeyPrefix = `fcm:${devKey}:notifications:${jobId}`;
for (const r of recipients) {
const associationId = r?.associationId;
if (!associationId) continue;
const assocKey = `${redisKeyPrefix}:${associationId}`;
const payload = JSON.stringify({
body: body || "",
scenarioKey: scenarioKey || "",
key: key || "",
variables: variables || {},
ts: Date.now()
});
await pubClient.rpush(assocKey, payload);
await pubClient.expire(assocKey, NOTIFICATION_EXPIRATION / 1000);
const recipientsSetKey = `fcm:${devKey}:recipients:${jobId}`;
await pubClient.sadd(recipientsSetKey, associationId);
await pubClient.expire(recipientsSetKey, NOTIFICATION_EXPIRATION / 1000);
// store some metadata once per jobId
const metaKey = `fcm:${devKey}:meta:${jobId}`;
await pubClient.hsetnx(metaKey, "jobRoNumber", jobRoNumber || "");
await pubClient.hsetnx(metaKey, "bodyShopId", bodyShopId || "");
await pubClient.hsetnx(metaKey, "bodyShopName", bodyShopName || "");
await pubClient.expire(metaKey, NOTIFICATION_EXPIRATION / 1000);
}
const consolidateKey = `fcm:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
if (flagSet) {
await fcmConsolidateQueue.add(
"consolidate-fcm",
{ jobId },
{
jobId: `consolidate-${jobId}`,
delay: FCM_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
devDebugLogger(`Scheduled FCM consolidation for jobId ${jobId}`);
} else {
devDebugLogger(`FCM consolidation already scheduled for jobId ${jobId}`);
}
},
{ prefix, connection: pubClient, concurrency: 5 }
);
fcmConsolidateWorker = new Worker(
"fcmConsolidate",
async (job) => {
const { jobId } = job.data;
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
const lockKey = `lock:${devKey}:fcmConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
if (!lockAcquired) {
devDebugLogger(`Skipped FCM consolidation for jobId ${jobId} - lock held by another worker`);
return;
}
try {
const admin = getFirebaseAdmin();
if (!admin) {
devDebugLogger("FCM consolidation skipped (Firebase not available).");
return;
}
const recipientsSet = `fcm:${devKey}:recipients:${jobId}`;
const associationIds = await pubClient.smembers(recipientsSet);
const metaKey = `fcm:${devKey}:meta:${jobId}`;
const meta = await pubClient.hgetall(metaKey);
const jobRoNumber = meta?.jobRoNumber || "";
const bodyShopId = meta?.bodyShopId || "";
const bodyShopName = meta?.bodyShopName || "";
for (const associationId of associationIds) {
const assocKey = `fcm:${devKey}:notifications:${jobId}:${associationId}`;
const messages = await pubClient.lrange(assocKey, 0, -1);
if (!messages?.length) continue;
const count = messages.length;
const notificationBody = buildPushSummary({ count, jobRoNumber, bodyShopName });
const topic = topicForAssociation(associationId);
// FCM "data" values MUST be strings
await admin.messaging().send({
topic,
notification: {
title: "ImEX Online",
body: notificationBody
},
data: {
type: "job-notification",
jobId: String(jobId),
jobRoNumber: String(jobRoNumber || ""),
bodyShopId: String(bodyShopId || ""),
bodyShopName: String(bodyShopName || ""),
associationId: String(associationId),
count: String(count)
},
android: { priority: "high" },
apns: { headers: { "apns-priority": "10" } }
});
devDebugLogger(`Sent FCM push to topic ${topic} for jobId ${jobId} (${count} updates)`);
await pubClient.del(assocKey);
}
await pubClient.del(recipientsSet);
await pubClient.del(metaKey);
await pubClient.del(`fcm:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log("fcm-queue-consolidation-error", "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
});
throw err;
} finally {
await pubClient.del(lockKey);
}
},
{ prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } }
);
fcmAddWorker.on("failed", (job, err) =>
logger.log("fcm-add-failed", "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack })
);
fcmConsolidateWorker.on("failed", (job, err) =>
logger.log("fcm-consolidate-failed", "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
const shutdown = async () => {
devDebugLogger("Closing FCM queue workers...");
await Promise.all([fcmAddWorker.close(), fcmConsolidateWorker.close()]);
devDebugLogger("FCM queue workers closed");
};
registerCleanupTask(shutdown);
}
return fcmAddQueue;
};
/**
* Get the FCM add queue.
* @returns {*}
*/
const getQueue = () => {
if (!fcmAddQueue) throw new Error("FCM add queue not initialized. Ensure loadFcmQueue is called during bootstrap.");
return fcmAddQueue;
};
/**
* Dispatch FCM notifications to the FCM add queue.
* @param fcmsToDispatch
* @returns {Promise<void>}
*/
const dispatchFcmsToQueue = async ({ fcmsToDispatch }) => {
if (!hasFirebaseEnv()) return;
const queue = getQueue();
for (const fcm of fcmsToDispatch) {
const { jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients } = fcm;
if (!jobId || !recipients?.length) continue;
await queue.add(
"add-fcm-notification",
{ jobId, jobRoNumber, bodyShopId, bodyShopName, scenarioKey, key, variables, body, recipients },
{ jobId: `${jobId}-${Date.now()}` }
);
}
};
module.exports = { loadFcmQueue, getQueue, dispatchFcmsToQueue };