247 lines
10 KiB
JavaScript
247 lines
10 KiB
JavaScript
const { Queue, Worker } = require("bullmq");
|
||
|
||
// Base time-related constant (in milliseconds)
|
||
const CONSOLIDATION_DELAY = 60000; // 1 minute (base timeout)
|
||
|
||
// Derived time-related constants based on CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to CONSOLIDATION_DELAY
|
||
const NOTIFICATION_STORAGE_EXPIRATION = CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, for notification storage)
|
||
const CONSOLIDATION_FLAG_EXPIRATION = CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation flag)
|
||
const LOCK_EXPIRATION = CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration)
|
||
const RATE_LIMITER_DURATION = CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting)
|
||
|
||
let addQueue;
|
||
let consolidateQueue;
|
||
|
||
/**
|
||
* Initializes the notification queues and workers for adding and consolidating notifications.
|
||
*
|
||
* @param {Object} options - Configuration options for queue initialization.
|
||
* @param {Object} options.pubClient - Redis client instance for queue communication.
|
||
* @param {Object} options.logger - Logger instance for logging events and debugging.
|
||
* @param {Object} options.redisHelpers - Utility functions for Redis operations.
|
||
* @param {Object} options.ioRedis - Socket.io Redis adapter for real-time event emission.
|
||
* @returns {Queue} The initialized `addQueue` instance for dispatching notifications.
|
||
*/
|
||
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||
// Only initialize if queues don't already exist
|
||
if (!addQueue || !consolidateQueue) {
|
||
logger.logger.info("Initializing Notifications Queues");
|
||
|
||
// Create queue for adding notifications
|
||
addQueue = new Queue("notificationsAdd", {
|
||
connection: pubClient,
|
||
prefix: "{BULLMQ}",
|
||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||
});
|
||
|
||
// Create queue for consolidating notifications
|
||
consolidateQueue = new Queue("notificationsConsolidate", {
|
||
connection: pubClient,
|
||
prefix: "{BULLMQ}",
|
||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||
});
|
||
|
||
// Worker to process jobs from the addQueue
|
||
const addWorker = new Worker(
|
||
"notificationsAdd",
|
||
async (job) => {
|
||
const { jobId, key, variables, recipients } = job.data;
|
||
logger.logger.info(`Adding notifications for jobId ${jobId}`);
|
||
|
||
const redisKeyPrefix = `app:notifications:${jobId}`;
|
||
const notification = { key, variables, timestamp: Date.now() };
|
||
|
||
// Store notification for each recipient in Redis
|
||
for (const recipient of recipients) {
|
||
const { user } = recipient;
|
||
const userKey = `${redisKeyPrefix}:${user}`;
|
||
const existingNotifications = await pubClient.get(userKey);
|
||
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
|
||
notifications.push(notification);
|
||
// Set with expiration to avoid stale data
|
||
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); // Convert to seconds
|
||
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
|
||
}
|
||
|
||
const consolidateKey = `app:consolidate:${jobId}`;
|
||
const flagSet = await pubClient.setnx(consolidateKey, "pending");
|
||
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
|
||
|
||
if (flagSet) {
|
||
// Schedule consolidation job with delay and retries
|
||
await consolidateQueue.add(
|
||
"consolidate-notifications",
|
||
{ jobId, recipients },
|
||
{
|
||
jobId: `consolidate:${jobId}`,
|
||
delay: CONSOLIDATION_DELAY,
|
||
attempts: 3, // Retry up to 3 times
|
||
backoff: LOCK_EXPIRATION // Retry delay matches lock expiration (15s)
|
||
}
|
||
);
|
||
logger.logger.info(`Scheduled consolidation for jobId ${jobId}`);
|
||
// Set expiration on flag
|
||
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); // Convert to seconds
|
||
} else {
|
||
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
|
||
}
|
||
},
|
||
{
|
||
connection: pubClient,
|
||
prefix: "{BULLMQ}",
|
||
concurrency: 5
|
||
}
|
||
);
|
||
|
||
// Worker to process jobs from the consolidateQueue
|
||
const consolidateWorker = new Worker(
|
||
"notificationsConsolidate",
|
||
async (job) => {
|
||
const { jobId, recipients } = job.data;
|
||
logger.logger.info(`Consolidating notifications for jobId ${jobId}`);
|
||
|
||
const redisKeyPrefix = `app:notifications:${jobId}`;
|
||
const lockKey = `lock:consolidate:${jobId}`;
|
||
// Acquire a lock to prevent concurrent consolidation
|
||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); // Convert to seconds
|
||
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
|
||
|
||
if (lockAcquired) {
|
||
try {
|
||
const allNotifications = {};
|
||
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
|
||
logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
|
||
|
||
// Retrieve and structure notifications by user and bodyShopId
|
||
for (const user of uniqueUsers) {
|
||
const userKey = `${redisKeyPrefix}:${user}`;
|
||
const notifications = await pubClient.get(userKey);
|
||
logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`);
|
||
|
||
if (notifications) {
|
||
const parsedNotifications = JSON.parse(notifications);
|
||
const userRecipients = recipients.filter((r) => r.user === user);
|
||
for (const { bodyShopId } of userRecipients) {
|
||
allNotifications[user] = allNotifications[user] || {};
|
||
allNotifications[user][bodyShopId] = parsedNotifications;
|
||
}
|
||
await pubClient.del(userKey);
|
||
logger.logger.debug(`Deleted Redis key ${userKey}`);
|
||
} else {
|
||
logger.logger.warn(`No notifications found for ${user} under ${userKey}`);
|
||
}
|
||
}
|
||
|
||
logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
|
||
|
||
// Emit notifications to users via Socket.io
|
||
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
|
||
const userMapping = await redisHelpers.getUserSocketMapping(user);
|
||
logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`);
|
||
|
||
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
|
||
if (userMapping && userMapping[bodyShopId]?.socketIds) {
|
||
userMapping[bodyShopId].socketIds.forEach((socketId) => {
|
||
logger.logger.debug(
|
||
`Emitting to socket ${socketId}: ${JSON.stringify({
|
||
jobId,
|
||
bodyShopId,
|
||
notifications
|
||
})}`
|
||
);
|
||
ioRedis.to(socketId).emit("notification", {
|
||
jobId,
|
||
bodyShopId,
|
||
notifications
|
||
});
|
||
});
|
||
logger.logger.info(
|
||
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId}`
|
||
);
|
||
} else {
|
||
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
await pubClient.del(`app:consolidate:${jobId}`);
|
||
} catch (err) {
|
||
logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
|
||
throw err; // Trigger retry if attempts remain
|
||
} finally {
|
||
await pubClient.del(lockKey);
|
||
}
|
||
} else {
|
||
logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
||
}
|
||
},
|
||
{
|
||
connection: pubClient,
|
||
prefix: "{BULLMQ}",
|
||
concurrency: 1,
|
||
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
|
||
}
|
||
);
|
||
|
||
// Log worker completion events
|
||
addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`));
|
||
consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`));
|
||
|
||
// Log worker failure events with error details
|
||
addWorker.on("failed", (job, err) =>
|
||
logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err })
|
||
);
|
||
|
||
consolidateWorker.on("failed", (job, err) =>
|
||
logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err })
|
||
);
|
||
|
||
// Graceful shutdown handler for workers
|
||
const shutdown = async () => {
|
||
logger.logger.info("Closing app queue workers...");
|
||
await Promise.all([addWorker.close(), consolidateWorker.close()]);
|
||
logger.logger.info("App queue workers closed");
|
||
};
|
||
|
||
process.on("SIGTERM", shutdown);
|
||
process.on("SIGINT", shutdown);
|
||
}
|
||
|
||
return addQueue;
|
||
};
|
||
|
||
/**
|
||
* Retrieves the initialized `addQueue` instance.
|
||
*
|
||
* @returns {Queue} The `addQueue` instance for adding notifications.
|
||
* @throws {Error} If `addQueue` is not initialized (i.e., `loadAppQueue` wasn’t called).
|
||
*/
|
||
const getQueue = () => {
|
||
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
|
||
return addQueue;
|
||
};
|
||
|
||
/**
|
||
* Dispatches notifications to the `addQueue` for processing.
|
||
*
|
||
* @param {Object} options - Options for dispatching notifications.
|
||
* @param {Array} options.appsToDispatch - Array of notification objects to dispatch.
|
||
* @param {Object} options.logger - Logger instance for logging dispatch events.
|
||
* @returns {Promise<void>} Resolves when all notifications are added to the queue.
|
||
*/
|
||
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
|
||
const appQueue = getQueue();
|
||
|
||
for (const app of appsToDispatch) {
|
||
const { jobId, bodyShopId, key, variables, recipients } = app;
|
||
await appQueue.add(
|
||
"add-notification",
|
||
{ jobId, bodyShopId, key, variables, recipients },
|
||
{ jobId: `${jobId}:${Date.now()}` }
|
||
);
|
||
logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||
}
|
||
};
|
||
|
||
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };
|