373 lines
14 KiB
JavaScript
373 lines
14 KiB
JavaScript
const { Queue, Worker } = require("bullmq");
|
||
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
|
||
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||
const devDebugLogger = require("../../utils/devDebugLogger");
|
||
const graphQLClient = require("../../graphql-client/graphql-client").client;
|
||
|
||
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
|
||
const APP_CONSOLIDATION_DELAY_IN_MINS = (() => {
|
||
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
|
||
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
|
||
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1
|
||
})();
|
||
|
||
// Base time-related constant (in milliseconds) / DO NOT TOUCH
|
||
const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
|
||
|
||
// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH
|
||
const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
|
||
const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
|
||
const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base)
|
||
const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base)
|
||
|
||
let addQueue;
|
||
let consolidateQueue;
|
||
|
||
/**
|
||
* Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications.
|
||
*
|
||
* @param {Array<Object>} notifications - Array of notification objects with 'body' and 'variables'.
|
||
* @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'.
|
||
*/
|
||
const buildNotificationContent = (notifications) => {
|
||
const scenarioText = notifications.map((n) => n.body); // Array of text entries
|
||
const fcmText = scenarioText.join(". "); // Concatenated text with period separator
|
||
const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects
|
||
|
||
return {
|
||
scenario_text: scenarioText,
|
||
fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null
|
||
scenario_meta: scenarioMeta
|
||
};
|
||
};
|
||
|
||
/**
|
||
* Convert MS to S
|
||
* @param ms
|
||
* @returns {number}
|
||
*/
|
||
const seconds = (ms) => Math.max(1, Math.ceil(ms / 1000));
|
||
|
||
/**
|
||
* Initializes the notification queues and workers for adding and consolidating notifications.
|
||
*/
|
||
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||
if (!addQueue || !consolidateQueue) {
|
||
const prefix = getBullMQPrefix();
|
||
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
|
||
|
||
devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`);
|
||
|
||
// Redis key helpers (per jobId)
|
||
const recipientsSetKey = (jobId) => `app:${devKey}:recipients:${jobId}`; // set of `${user}:${bodyShopId}`
|
||
const recipientAssocHashKey = (jobId) => `app:${devKey}:recipientAssoc:${jobId}`; // hash `${user}:${bodyShopId}` => associationId
|
||
const consolidateFlagKey = (jobId) => `app:${devKey}:consolidate:${jobId}`;
|
||
const lockKeyForJob = (jobId) => `lock:${devKey}:consolidate:${jobId}`;
|
||
const listKey = ({ jobId, user, bodyShopId }) => `app:${devKey}:notifications:${jobId}:${user}:${bodyShopId}`;
|
||
|
||
addQueue = new Queue("notificationsAdd", {
|
||
prefix,
|
||
connection: pubClient,
|
||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||
});
|
||
|
||
consolidateQueue = new Queue("notificationsConsolidate", {
|
||
prefix,
|
||
connection: pubClient,
|
||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||
});
|
||
|
||
const addWorker = new Worker(
|
||
"notificationsAdd",
|
||
async (job) => {
|
||
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
|
||
devDebugLogger(`Adding notifications for jobId ${jobId}`);
|
||
|
||
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
|
||
|
||
// Store notifications atomically (RPUSH) and store recipients in a Redis set
|
||
for (const recipient of recipients || []) {
|
||
const { user, bodyShopId, associationId } = recipient;
|
||
if (!user || !bodyShopId) continue;
|
||
|
||
const rk = `${user}:${bodyShopId}`;
|
||
|
||
// (1) Store notification payload in a list (atomic append)
|
||
const lk = listKey({ jobId, user, bodyShopId });
|
||
await pubClient.rpush(lk, JSON.stringify(notification));
|
||
await pubClient.expire(lk, seconds(NOTIFICATION_STORAGE_EXPIRATION));
|
||
|
||
// (2) Track recipients in a set, and associationId in a hash
|
||
await pubClient.sadd(recipientsSetKey(jobId), rk);
|
||
await pubClient.expire(recipientsSetKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION));
|
||
|
||
if (associationId) {
|
||
await pubClient.hset(recipientAssocHashKey(jobId), rk, String(associationId));
|
||
}
|
||
await pubClient.expire(recipientAssocHashKey(jobId), seconds(NOTIFICATION_STORAGE_EXPIRATION));
|
||
}
|
||
|
||
// Schedule consolidation once per jobId
|
||
const flagKey = consolidateFlagKey(jobId);
|
||
const flagSet = await pubClient.setnx(flagKey, "pending");
|
||
devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
|
||
|
||
if (flagSet) {
|
||
await consolidateQueue.add(
|
||
"consolidate-notifications",
|
||
{ jobId },
|
||
{
|
||
jobId: `consolidate-${jobId}`,
|
||
delay: APP_CONSOLIDATION_DELAY,
|
||
attempts: 3,
|
||
backoff: LOCK_EXPIRATION
|
||
}
|
||
);
|
||
|
||
await pubClient.expire(flagKey, seconds(CONSOLIDATION_FLAG_EXPIRATION));
|
||
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
|
||
} else {
|
||
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
|
||
}
|
||
},
|
||
{
|
||
prefix,
|
||
connection: pubClient,
|
||
concurrency: 5
|
||
}
|
||
);
|
||
|
||
const consolidateWorker = new Worker(
|
||
"notificationsConsolidate",
|
||
async (job) => {
|
||
const { jobId } = job.data;
|
||
devDebugLogger(`Consolidating notifications for jobId ${jobId}`);
|
||
|
||
const lockKey = lockKeyForJob(jobId);
|
||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", seconds(LOCK_EXPIRATION));
|
||
devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
|
||
|
||
if (!lockAcquired) {
|
||
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
||
return;
|
||
}
|
||
|
||
try {
|
||
const rkSet = recipientsSetKey(jobId);
|
||
const assocHash = recipientAssocHashKey(jobId);
|
||
|
||
const recipientKeys = await pubClient.smembers(rkSet);
|
||
if (!recipientKeys?.length) {
|
||
devDebugLogger(`No recipients found for jobId ${jobId}, nothing to consolidate.`);
|
||
await pubClient.del(consolidateFlagKey(jobId));
|
||
return;
|
||
}
|
||
|
||
const assocMap = await pubClient.hgetall(assocHash);
|
||
|
||
// Collect notifications by recipientKey
|
||
const notificationsByRecipient = new Map(); // rk => parsed notifications array
|
||
const listKeysToDelete = []; // delete only after successful insert+emit
|
||
|
||
for (const rk of recipientKeys) {
|
||
const [user, bodyShopId] = rk.split(":");
|
||
const lk = listKey({ jobId, user, bodyShopId });
|
||
|
||
const items = await pubClient.lrange(lk, 0, -1);
|
||
if (!items?.length) continue;
|
||
|
||
const parsed = items
|
||
.map((x) => {
|
||
try {
|
||
return JSON.parse(x);
|
||
} catch {
|
||
return null;
|
||
}
|
||
})
|
||
.filter(Boolean);
|
||
|
||
if (parsed.length) {
|
||
notificationsByRecipient.set(rk, parsed);
|
||
|
||
// IMPORTANT: do NOT delete list yet; only delete after successful insert+emit
|
||
listKeysToDelete.push(lk);
|
||
}
|
||
}
|
||
|
||
if (!notificationsByRecipient.size) {
|
||
devDebugLogger(`No notifications found in lists for jobId ${jobId}, nothing to insert/emit.`);
|
||
if (listKeysToDelete.length) {
|
||
await pubClient.del(...listKeysToDelete);
|
||
}
|
||
await pubClient.del(rkSet);
|
||
await pubClient.del(assocHash);
|
||
await pubClient.del(consolidateFlagKey(jobId));
|
||
return;
|
||
}
|
||
|
||
// Build DB inserts
|
||
const inserts = [];
|
||
const insertMeta = []; // keep rk + associationId to emit after insert
|
||
|
||
for (const [rk, notifications] of notificationsByRecipient.entries()) {
|
||
const associationId = assocMap?.[rk];
|
||
|
||
// If your DB requires associationid NOT NULL, skip if missing
|
||
if (!associationId) {
|
||
devDebugLogger(`Skipping insert for ${rk} (missing associationId).`);
|
||
continue;
|
||
}
|
||
|
||
const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications);
|
||
|
||
inserts.push({
|
||
jobid: jobId,
|
||
associationid: associationId,
|
||
// NOTE: if these are jsonb columns, remove JSON.stringify and pass arrays directly.
|
||
scenario_text: JSON.stringify(scenario_text),
|
||
fcm_text,
|
||
scenario_meta: JSON.stringify(scenario_meta)
|
||
});
|
||
|
||
insertMeta.push({ rk, associationId });
|
||
}
|
||
|
||
// Map notificationId by associationId from Hasura returning rows
|
||
const idByAssociationId = new Map();
|
||
|
||
if (inserts.length > 0) {
|
||
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: inserts });
|
||
|
||
const returning = insertResponse?.insert_notifications?.returning || [];
|
||
returning.forEach((row) => {
|
||
// Expecting your mutation to return associationid as well as id.
|
||
// If your mutation currently doesn’t return associationid, update it.
|
||
if (row?.associationid) idByAssociationId.set(String(row.associationid), row.id);
|
||
});
|
||
|
||
devDebugLogger(
|
||
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
|
||
);
|
||
}
|
||
|
||
// Emit via Socket.io
|
||
// Group by user to reduce mapping lookups
|
||
const uniqueUsers = [...new Set(insertMeta.map(({ rk }) => rk.split(":")[0]))];
|
||
|
||
for (const user of uniqueUsers) {
|
||
const userMapping = await redisHelpers.getUserSocketMapping(user);
|
||
const entriesForUser = insertMeta
|
||
.map((m) => ({ ...m, user: m.rk.split(":")[0], bodyShopId: m.rk.split(":")[1] }))
|
||
.filter((m) => m.user === user);
|
||
|
||
for (const entry of entriesForUser) {
|
||
const { rk, bodyShopId, associationId } = entry;
|
||
const notifications = notificationsByRecipient.get(rk) || [];
|
||
if (!notifications.length) continue;
|
||
|
||
const jobRoNumber = notifications[0]?.jobRoNumber;
|
||
const notificationId = idByAssociationId.get(String(associationId)) || null;
|
||
|
||
if (userMapping && userMapping[bodyShopId]?.socketIds) {
|
||
userMapping[bodyShopId].socketIds.forEach((socketId) => {
|
||
ioRedis.to(socketId).emit("notification", {
|
||
jobId,
|
||
jobRoNumber,
|
||
bodyShopId,
|
||
notifications,
|
||
notificationId,
|
||
associationId
|
||
});
|
||
});
|
||
|
||
devDebugLogger(
|
||
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} (notificationId ${notificationId})`
|
||
);
|
||
} else {
|
||
devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Cleanup recipient tracking keys + consolidation flag
|
||
await pubClient.del(rkSet);
|
||
await pubClient.del(assocHash);
|
||
await pubClient.del(consolidateFlagKey(jobId));
|
||
} catch (err) {
|
||
logger.log("app-queue-consolidation-error", "ERROR", "notifications", "api", {
|
||
message: err?.message,
|
||
stack: err?.stack
|
||
});
|
||
throw err;
|
||
} finally {
|
||
await pubClient.del(lockKey);
|
||
}
|
||
},
|
||
{
|
||
prefix,
|
||
connection: pubClient,
|
||
concurrency: 1,
|
||
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
|
||
}
|
||
);
|
||
|
||
addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`));
|
||
consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`));
|
||
|
||
addWorker.on("failed", (job, err) =>
|
||
logger.log("app-queue-notification-error", "ERROR", "notifications", "api", {
|
||
message: err?.message,
|
||
stack: err?.stack
|
||
})
|
||
);
|
||
|
||
consolidateWorker.on("failed", (job, err) =>
|
||
logger.log("app-queue-consolidation-failed", "ERROR", "notifications", "api", {
|
||
message: err?.message,
|
||
stack: err?.stack
|
||
})
|
||
);
|
||
|
||
// Register cleanup task instead of direct process listeners
|
||
const shutdown = async () => {
|
||
devDebugLogger("Closing app queue workers...");
|
||
await Promise.all([addWorker.close(), consolidateWorker.close()]);
|
||
devDebugLogger("App queue workers closed");
|
||
};
|
||
|
||
registerCleanupTask(shutdown);
|
||
}
|
||
|
||
return addQueue;
|
||
};
|
||
|
||
/**
|
||
* Retrieves the initialized `addQueue` instance.
|
||
*/
|
||
const getQueue = () => {
|
||
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
|
||
return addQueue;
|
||
};
|
||
|
||
/**
|
||
* Dispatches notifications to the `addQueue` for processing.
|
||
*/
|
||
const dispatchAppsToQueue = async ({ appsToDispatch }) => {
|
||
const appQueue = getQueue();
|
||
|
||
for (const app of appsToDispatch) {
|
||
const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app;
|
||
|
||
await appQueue.add(
|
||
"add-notification",
|
||
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
|
||
{ jobId: `${jobId}-${Date.now()}` }
|
||
);
|
||
|
||
devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||
}
|
||
};
|
||
|
||
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };
|