298 lines
12 KiB
JavaScript
298 lines
12 KiB
JavaScript
const { Queue, Worker } = require("bullmq");
|
|
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
|
|
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
|
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
|
const devDebugLogger = require("../../utils/devDebugLogger");
|
|
const graphQLClient = require("../../graphql-client/graphql-client").client;
|
|
|
|
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
|
|
const APP_CONSOLIDATION_DELAY_IN_MINS = (() => {
|
|
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
|
|
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
|
|
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1
|
|
})();
|
|
|
|
// Base time-related constant (in milliseconds) / DO NOT TOUCH
|
|
const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
|
|
|
|
// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH
|
|
const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
|
|
const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
|
|
const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base)
|
|
const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base)
|
|
|
|
let addQueue;
|
|
let consolidateQueue;
|
|
|
|
/**
|
|
* Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications.
|
|
*
|
|
* @param {Array<Object>} notifications - Array of notification objects with 'body' and 'variables'.
|
|
* @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'.
|
|
*/
|
|
const buildNotificationContent = (notifications) => {
|
|
const scenarioText = notifications.map((n) => n.body); // Array of text entries
|
|
const fcmText = scenarioText.join(". "); // Concatenated text with period separator
|
|
const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects
|
|
|
|
return {
|
|
scenario_text: scenarioText,
|
|
fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null
|
|
scenario_meta: scenarioMeta
|
|
};
|
|
};
|
|
|
|
/**
|
|
* Initializes the notification queues and workers for adding and consolidating notifications.
|
|
*/
|
|
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|
if (!addQueue || !consolidateQueue) {
|
|
const prefix = getBullMQPrefix();
|
|
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
|
|
|
|
devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`);
|
|
|
|
addQueue = new Queue("notificationsAdd", {
|
|
prefix,
|
|
connection: pubClient,
|
|
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
|
});
|
|
|
|
consolidateQueue = new Queue("notificationsConsolidate", {
|
|
prefix,
|
|
connection: pubClient,
|
|
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
|
});
|
|
|
|
const addWorker = new Worker(
|
|
"notificationsAdd",
|
|
async (job) => {
|
|
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
|
|
devDebugLogger(`Adding notifications for jobId ${jobId}`);
|
|
|
|
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
|
|
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
|
|
|
|
for (const recipient of recipients) {
|
|
const { user } = recipient;
|
|
const userKey = `${redisKeyPrefix}:${user}`;
|
|
const existingNotifications = await pubClient.get(userKey);
|
|
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
|
|
notifications.push(notification);
|
|
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000);
|
|
devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
|
|
}
|
|
|
|
const consolidateKey = `app:${devKey}:consolidate:${jobId}`;
|
|
const flagSet = await pubClient.setnx(consolidateKey, "pending");
|
|
devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
|
|
|
|
if (flagSet) {
|
|
await consolidateQueue.add(
|
|
"consolidate-notifications",
|
|
{ jobId, recipients },
|
|
{
|
|
jobId: `consolidate:${jobId}`,
|
|
delay: APP_CONSOLIDATION_DELAY,
|
|
attempts: 3,
|
|
backoff: LOCK_EXPIRATION
|
|
}
|
|
);
|
|
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
|
|
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
|
|
} else {
|
|
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
|
|
}
|
|
},
|
|
{
|
|
prefix,
|
|
connection: pubClient,
|
|
concurrency: 5
|
|
}
|
|
);
|
|
|
|
const consolidateWorker = new Worker(
|
|
"notificationsConsolidate",
|
|
async (job) => {
|
|
const { jobId, recipients } = job.data;
|
|
devDebugLogger(`Consolidating notifications for jobId ${jobId}`);
|
|
|
|
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
|
|
const lockKey = `lock:${devKey}:consolidate:${jobId}`;
|
|
|
|
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
|
|
devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
|
|
|
|
if (lockAcquired) {
|
|
try {
|
|
const allNotifications = {};
|
|
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
|
|
devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
|
|
|
|
for (const user of uniqueUsers) {
|
|
const userKey = `${redisKeyPrefix}:${user}`;
|
|
const notifications = await pubClient.get(userKey);
|
|
devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`);
|
|
|
|
if (notifications) {
|
|
const parsedNotifications = JSON.parse(notifications);
|
|
const userRecipients = recipients.filter((r) => r.user === user);
|
|
for (const { bodyShopId } of userRecipients) {
|
|
allNotifications[user] = allNotifications[user] || {};
|
|
allNotifications[user][bodyShopId] = parsedNotifications;
|
|
}
|
|
await pubClient.del(userKey);
|
|
devDebugLogger(`Deleted Redis key ${userKey}`);
|
|
} else {
|
|
devDebugLogger(`No notifications found for ${user} under ${userKey}`);
|
|
}
|
|
}
|
|
|
|
devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
|
|
|
|
// Insert notifications into the database and collect IDs
|
|
const notificationInserts = [];
|
|
const notificationIdMap = new Map();
|
|
|
|
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
|
|
const userRecipients = recipients.filter((r) => r.user === user);
|
|
const associationId = userRecipients[0]?.associationId;
|
|
|
|
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
|
|
const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications);
|
|
notificationInserts.push({
|
|
jobid: jobId,
|
|
associationid: associationId,
|
|
scenario_text: JSON.stringify(scenario_text),
|
|
fcm_text: fcm_text,
|
|
scenario_meta: JSON.stringify(scenario_meta)
|
|
});
|
|
notificationIdMap.set(`${user}:${bodyShopId}`, null);
|
|
}
|
|
}
|
|
|
|
if (notificationInserts.length > 0) {
|
|
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
|
|
objects: notificationInserts
|
|
});
|
|
devDebugLogger(
|
|
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
|
|
);
|
|
|
|
insertResponse.insert_notifications.returning.forEach((row, index) => {
|
|
const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)];
|
|
const bodyShopId = Object.keys(allNotifications[user])[
|
|
index % Object.keys(allNotifications[user]).length
|
|
];
|
|
notificationIdMap.set(`${user}:${bodyShopId}`, row.id);
|
|
});
|
|
}
|
|
|
|
// Emit notifications to users via Socket.io with notification ID
|
|
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
|
|
const userMapping = await redisHelpers.getUserSocketMapping(user);
|
|
const userRecipients = recipients.filter((r) => r.user === user);
|
|
const associationId = userRecipients[0]?.associationId;
|
|
|
|
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
|
|
const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`);
|
|
const jobRoNumber = notifications[0]?.jobRoNumber;
|
|
|
|
if (userMapping && userMapping[bodyShopId]?.socketIds) {
|
|
userMapping[bodyShopId].socketIds.forEach((socketId) => {
|
|
ioRedis.to(socketId).emit("notification", {
|
|
jobId,
|
|
jobRoNumber,
|
|
bodyShopId,
|
|
notifications,
|
|
notificationId,
|
|
associationId
|
|
});
|
|
});
|
|
devDebugLogger(
|
|
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
|
|
);
|
|
} else {
|
|
devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
await pubClient.del(`app:${devKey}:consolidate:${jobId}`);
|
|
} catch (err) {
|
|
logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
});
|
|
throw err;
|
|
} finally {
|
|
await pubClient.del(lockKey);
|
|
}
|
|
} else {
|
|
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
|
}
|
|
},
|
|
{
|
|
prefix,
|
|
connection: pubClient,
|
|
concurrency: 1,
|
|
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
|
|
}
|
|
);
|
|
|
|
addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`));
|
|
consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`));
|
|
|
|
addWorker.on("failed", (job, err) =>
|
|
logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
})
|
|
);
|
|
consolidateWorker.on("failed", (job, err) =>
|
|
logger.log(`app-queue-consolidation-failed:`, "ERROR", "notifications", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
})
|
|
);
|
|
|
|
// Register cleanup task instead of direct process listeners
|
|
const shutdown = async () => {
|
|
devDebugLogger("Closing app queue workers...");
|
|
await Promise.all([addWorker.close(), consolidateWorker.close()]);
|
|
devDebugLogger("App queue workers closed");
|
|
};
|
|
|
|
registerCleanupTask(shutdown);
|
|
}
|
|
|
|
return addQueue;
|
|
};
|
|
|
|
/**
|
|
* Retrieves the initialized `addQueue` instance.
|
|
*/
|
|
const getQueue = () => {
|
|
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
|
|
return addQueue;
|
|
};
|
|
|
|
/**
|
|
* Dispatches notifications to the `addQueue` for processing.
|
|
*/
|
|
const dispatchAppsToQueue = async ({ appsToDispatch }) => {
|
|
const appQueue = getQueue();
|
|
|
|
for (const app of appsToDispatch) {
|
|
const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app;
|
|
await appQueue.add(
|
|
"add-notification",
|
|
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
|
|
{ jobId: `${jobId}:${Date.now()}` }
|
|
);
|
|
devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
|
}
|
|
};
|
|
|
|
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };
|