Files
bodyshop/server/notifications/queues/appQueue.js

373 lines
14 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const { Queue, Worker } = require("bullmq");
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const graphQLClient = require("../../graphql-client/graphql-client").client;
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
const APP_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1
})();
// Base time-related constant (in milliseconds) / DO NOT TOUCH
const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH
const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base)
const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base)
let addQueue;
let consolidateQueue;
/**
* Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications.
*
* @param {Array<Object>} notifications - Array of notification objects with 'body' and 'variables'.
* @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'.
*/
const buildNotificationContent = (notifications) => {
const scenarioText = notifications.map((n) => n.body); // Array of text entries
const fcmText = scenarioText.join(". "); // Concatenated text with period separator
const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects
return {
scenario_text: scenarioText,
fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null
scenario_meta: scenarioMeta
};
};
/**
* Convert MS to S
* @param ms
* @returns {number}
*/
const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000));
/**
* Initializes the notification queues and workers for adding and consolidating notifications.
*/
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`);
// Redis key helpers (per jobId)
const recipientsSetKey = (jobId) => `app:${devKey}:recipients:${jobId}`; // set of `${user}:${bodyShopId}`
const recipientAssocHashKey = (jobId) => `app:${devKey}:recipientAssoc:${jobId}`; // hash `${user}:${bodyShopId}` => associationId
const consolidateFlagKey = (jobId) => `app:${devKey}:consolidate:${jobId}`;
const lockKeyForJob = (jobId) => `lock:${devKey}:consolidate:${jobId}`;
const listKey = ({ jobId, user, bodyShopId }) => `app:${devKey}:notifications:${jobId}:${user}:${bodyShopId}`;
addQueue = new Queue("notificationsAdd", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
consolidateQueue = new Queue("notificationsConsolidate", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
const addWorker = new Worker(
"notificationsAdd",
async (job) => {
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
devDebugLogger(`Adding notifications for jobId ${jobId}`);
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
// Store notifications atomically (RPUSH) and store recipients in a Redis set
for (const recipient of recipients || []) {
const { user, bodyShopId, associationId } = recipient;
if (!user || !bodyShopId) continue;
const rk = `${user}:${bodyShopId}`;
// (1) Store notification payload in a list (atomic append)
const lk = listKey({ jobId, user, bodyShopId });
await pubClient.rpush(lk, JSON.stringify(notification));
await pubClient.expire(lk, seconds(NOTIFICATION_STORAGE_EXPIRATION));
// (2) Track recipients in a set, and associationId in a hash
await pubClient.sadd(recipientsSetKey(jobId), rk);
await pubClient.expire(recipientsSetKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION));
if (associationId) {
await pubClient.hset(recipientAssocHashKey(jobId), rk, String(associationId));
}
await pubClient.expire(recipientAssocHashKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION));
}
// Schedule consolidation once per jobId
const flagKey = consolidateFlagKey(jobId);
const flagSet = await pubClient.setnx(flagKey, "pending");
devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
if (flagSet) {
await consolidateQueue.add(
"consolidate-notifications",
{ jobId },
{
jobId: `consolidate-${jobId}`,
delay: APP_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
await pubClient.expire(flagKey, seconds(CONSOLIDATION_FLAG_EXPIRATION));
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
} else {
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
}
},
{
prefix,
connection: pubClient,
concurrency: 5
}
);
const consolidateWorker = new Worker(
"notificationsConsolidate",
async (job) => {
const { jobId } = job.data;
devDebugLogger(`Consolidating notifications for jobId ${jobId}`);
const lockKey = lockKeyForJob(jobId);
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION));
devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
if (!lockAcquired) {
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
return;
}
try {
const rkSet = recipientsSetKey(jobId);
const assocHash = recipientAssocHashKey(jobId);
const recipientKeys = await pubClient.smembers(rkSet);
if (!recipientKeys?.length) {
devDebugLogger(`No recipients found for jobId ${jobId}, nothing to consolidate.`);
await pubClient.del(consolidateFlagKey(jobId));
return;
}
const assocMap = await pubClient.hgetall(assocHash);
// Collect notifications by recipientKey
const notificationsByRecipient = new Map(); // rk => parsed notifications array
const listKeysToDelete = []; // delete only after successful insert+emit
for (const rk of recipientKeys) {
const [user, bodyShopId] = rk.split(":");
const lk = listKey({ jobId, user, bodyShopId });
const items = await pubClient.lrange(lk, 0, -1);
if (!items?.length) continue;
const parsed = items
.map((x) => {
try {
return JSON.parse(x);
} catch {
return null;
}
})
.filter(Boolean);
if (parsed.length) {
notificationsByRecipient.set(rk, parsed);
// IMPORTANT: do NOT delete list yet; only delete after successful insert+emit
listKeysToDelete.push(lk);
}
}
if (!notificationsByRecipient.size) {
devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`);
if (listKeysToDelete.length) {
await pubClient.del(...listKeysToDelete);
}
await pubClient.del(rkSet);
await pubClient.del(assocHash);
await pubClient.del(consolidateFlagKey(jobId));
return;
}
// Build DB inserts
const inserts = [];
const insertMeta = []; // keep rk + associationId to emit after insert
for (const [rk, notifications] of notificationsByRecipient.entries()) {
const associationId = assocMap?.[rk];
// If your DB requires associationid NOT NULL, skip if missing
if (!associationId) {
devDebugLogger(`Skipping insert for ${rk} (missing associationId).`);
continue;
}
const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications);
inserts.push({
jobid: jobId,
associationid: associationId,
// NOTE: if these are jsonb columns, remove JSON.stringify and pass arrays directly.
scenario_text: JSON.stringify(scenario_text),
fcm_text,
scenario_meta: JSON.stringify(scenario_meta)
});
insertMeta.push({ rk, associationId });
}
// Map notificationId by associationId from Hasura returning rows
const idByAssociationId = new Map();
if (inserts.length > 0) {
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: inserts });
const returning = insertResponse?.insert_notifications?.returning || [];
returning.forEach((row) => {
// Expecting your mutation to return associationid as well as id.
// If your mutation currently doesnt return associationid, update it.
if (row?.associationid) idByAssociationId.set(String(row.associationid), row.id);
});
devDebugLogger(
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
);
}
// Emit via Socket.io
// Group by user to reduce mapping lookups
const uniqueUsers = [...new Set(insertMeta.map(({ rk }) => rk.split(":")[0]))];
for (const user of uniqueUsers) {
const userMapping = await redisHelpers.getUserSocketMapping(user);
const entriesForUser = insertMeta
.map((m) => ({ ...m, user: m.rk.split(":")[0], bodyShopId: m.rk.split(":")[1] }))
.filter((m) => m.user === user);
for (const entry of entriesForUser) {
const { rk, bodyShopId, associationId } = entry;
const notifications = notificationsByRecipient.get(rk) || [];
if (!notifications.length) continue;
const jobRoNumber = notifications[0]?.jobRoNumber;
const notificationId = idByAssociationId.get(String(associationId)) || null;
if (userMapping && userMapping[bodyShopId]?.socketIds) {
userMapping[bodyShopId].socketIds.forEach((socketId) => {
ioRedis.to(socketId).emit("notification", {
jobId,
jobRoNumber,
bodyShopId,
notifications,
notificationId,
associationId
});
});
devDebugLogger(
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} (notificationId ${notificationId})`
);
} else {
devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
}
}
// Cleanup recipient tracking keys + consolidation flag
await pubClient.del(rkSet);
await pubClient.del(assocHash);
await pubClient.del(consolidateFlagKey(jobId));
} catch (err) {
logger.log("app-queue-consolidation-error", "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
});
throw err;
} finally {
await pubClient.del(lockKey);
}
},
{
prefix,
connection: pubClient,
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`));
addWorker.on("failed", (job, err) =>
logger.log("app-queue-notification-error", "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
consolidateWorker.on("failed", (job, err) =>
logger.log("app-queue-consolidation-failed", "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
devDebugLogger("Closing app queue workers...");
await Promise.all([addWorker.close(), consolidateWorker.close()]);
devDebugLogger("App queue workers closed");
};
registerCleanupTask(shutdown);
}
return addQueue;
};
/**
* Retrieves the initialized `addQueue` instance.
*/
const getQueue = () => {
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
return addQueue;
};
/**
* Dispatches notifications to the `addQueue` for processing.
*/
const dispatchAppsToQueue = async ({ appsToDispatch }) => {
const appQueue = getQueue();
for (const app of appsToDispatch) {
const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app;
await appQueue.add(
"add-notification",
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
{ jobId: `${jobId}-${Date.now()}` }
);
devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };