Files
bodyshop/server/notifications/queues/appQueue.js

336 lines
14 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const { Queue, Worker } = require("bullmq");
const graphQLClient = require("../../graphql-client/graphql-client").client;
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
const APP_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 1 : Math.max(1, parsedValue); // Default to 1, ensure at least 1
})();
// Base time-related constant (in milliseconds) / DO NOT TOUCH
const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH
const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base)
const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base)
let addQueue;
let consolidateQueue;
// GraphQL mutation to insert notifications
const INSERT_NOTIFICATIONS_MUTATION = `
mutation INSERT_NOTIFICATIONS($objects: [notifications_insert_input!]!) {
insert_notifications(objects: $objects) {
affected_rows
returning {
id
jobid
associationid
ui_translation_string
ui_translation_meta
html_body
}
}
}
`;
/**
* Builds an HTML unordered list from an array of notification bodies.
*
* @param {Array<Object>} notifications - Array of notification objects with a 'body' field.
* @returns {string} HTML string representing an unordered list of bodies.
*/
const buildHtmlBody = (notifications) => {
const listItems = notifications.map((n) => `<li>${n.body}</li>`).join("");
return `<ul>${listItems}</ul>`;
};
/**
* Determines the key and variables for a batch of notifications.
*
* @param {Array<Object>} notifications - Array of notification objects with 'key' and 'variables'.
* @returns {Object} An object with 'key' and 'variables' properties.
*/
const determineKeyAndVariables = (notifications) => {
if (notifications.length === 1) {
// Single notification: use the original key and variables
return {
key: notifications[0].key,
variables: notifications[0].variables
};
} else {
// Multiple notifications: use a generic key and consolidate variables with their original keys
return {
key: "notifications.job.multipleChanges",
variables: {
variables: notifications.map((n) => ({
key: n.key, // Include the original key in each variables object
...n.variables
}))
}
};
}
};
/**
* Initializes the notification queues and workers for adding and consolidating notifications.
*
* @param {Object} options - Configuration options for queue initialization.
* @param {Object} options.pubClient - Redis client instance for queue communication.
* @param {Object} options.logger - Logger instance for logging events and debugging.
* @param {Object} options.redisHelpers - Utility functions for Redis operations.
* @param {Object} options.ioRedis - Socket.io Redis adapter for real-time event emission.
* @returns {Queue} The initialized `addQueue` instance for dispatching notifications.
*/
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
logger.logger.info("Initializing Notifications Queues");
addQueue = new Queue("notificationsAdd", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
consolidateQueue = new Queue("notificationsConsolidate", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
const addWorker = new Worker(
"notificationsAdd",
async (job) => {
const { jobId, key, variables, recipients, body } = job.data;
logger.logger.info(`Adding notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const notification = { key, variables, body, timestamp: Date.now() };
for (const recipient of recipients) {
const { user } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
const existingNotifications = await pubClient.get(userKey);
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
notifications.push(notification);
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000);
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
const consolidateKey = `app:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
if (flagSet) {
await consolidateQueue.add(
"consolidate-notifications",
{ jobId, recipients },
{
jobId: `consolidate:${jobId}`,
delay: APP_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
logger.logger.info(`Scheduled consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
} else {
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
const consolidateWorker = new Worker(
"notificationsConsolidate",
async (job) => {
const { jobId, recipients } = job.data;
logger.logger.info(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const lockKey = `lock:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
if (lockAcquired) {
try {
const allNotifications = {};
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
for (const user of uniqueUsers) {
const userKey = `${redisKeyPrefix}:${user}`;
const notifications = await pubClient.get(userKey);
logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
const userRecipients = recipients.filter((r) => r.user === user);
for (const { bodyShopId } of userRecipients) {
allNotifications[user] = allNotifications[user] || {};
allNotifications[user][bodyShopId] = parsedNotifications;
}
await pubClient.del(userKey);
logger.logger.debug(`Deleted Redis key ${userKey}`);
} else {
logger.logger.warn(`No notifications found for ${user} under ${userKey}`);
}
}
logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
// Insert notifications into the database and collect IDs
const notificationInserts = [];
const notificationIdMap = new Map();
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userRecipients = recipients.filter((r) => r.user === user);
const employeeId = userRecipients[0]?.employeeId;
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
const { key, variables } = determineKeyAndVariables(notifications);
const htmlBody = buildHtmlBody(notifications);
notificationInserts.push({
jobid: jobId,
associationid: employeeId || null,
ui_translation_string: key,
ui_translation_meta: JSON.stringify(variables),
html_body: htmlBody
});
notificationIdMap.set(`${user}:${bodyShopId}`, null);
}
}
if (notificationInserts.length > 0) {
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
objects: notificationInserts
});
logger.logger.info(
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
);
insertResponse.insert_notifications.returning.forEach((row, index) => {
const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)];
const bodyShopId = Object.keys(allNotifications[user])[
index % Object.keys(allNotifications[user]).length
];
notificationIdMap.set(`${user}:${bodyShopId}`, row.id);
});
}
// Emit notifications to users via Socket.io with notification ID
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userMapping = await redisHelpers.getUserSocketMapping(user);
logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`);
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`);
if (userMapping && userMapping[bodyShopId]?.socketIds) {
userMapping[bodyShopId].socketIds.forEach((socketId) => {
logger.logger.debug(
`Emitting to socket ${socketId}: ${JSON.stringify({
jobId,
bodyShopId,
notifications,
notificationId
})}`
);
ioRedis.to(socketId).emit("notification", {
jobId,
bodyShopId,
notifications,
notificationId
});
});
logger.logger.info(
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
);
} else {
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
}
}
await pubClient.del(`app:consolidate:${jobId}`);
} catch (err) {
logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
throw err;
} finally {
await pubClient.del(lockKey);
}
} else {
logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`));
addWorker.on("failed", (job, err) =>
logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err })
);
consolidateWorker.on("failed", (job, err) =>
logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err })
);
const shutdown = async () => {
logger.logger.info("Closing app queue workers...");
await Promise.all([addWorker.close(), consolidateWorker.close()]);
logger.logger.info("App queue workers closed");
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
return addQueue;
};
/**
* Retrieves the initialized `addQueue` instance.
*
* @returns {Queue} The `addQueue` instance for adding notifications.
* @throws {Error} If `addQueue` is not initialized (i.e., `loadAppQueue` wasnt called).
*/
const getQueue = () => {
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
return addQueue;
};
/**
* Dispatches notifications to the `addQueue` for processing.
*
* @param {Object} options - Options for dispatching notifications.
* @param {Array} options.appsToDispatch - Array of notification objects to dispatch.
* @param {Object} options.logger - Logger instance for logging dispatch events.
* @returns {Promise<void>} Resolves when all notifications are added to the queue.
*/
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
const appQueue = getQueue();
for (const app of appsToDispatch) {
const { jobId, bodyShopId, key, variables, recipients, body } = app;
await appQueue.add(
"add-notification",
{ jobId, bodyShopId, key, variables, recipients, body },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };