233 lines
9.9 KiB
JavaScript
233 lines
9.9 KiB
JavaScript
const { Queue, Worker } = require("bullmq");
|
|
const { sendTaskEmail } = require("../../email/sendemail");
|
|
const generateEmailTemplate = require("../../email/generateTemplate");
|
|
const { InstanceEndpoints } = require("../../utils/instanceMgr");
|
|
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
|
|
|
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
|
|
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
|
|
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
|
|
return isNaN(parsedValue) ? 1 : Math.max(1, parsedValue); // Default to 1, ensure at least 1
|
|
})();
|
|
|
|
// Base time-related constant (in milliseconds) / DO NOT TOUCH
|
|
const EMAIL_CONSOLIDATION_DELAY = EMAIL_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
|
|
|
|
// Derived time-related constants based on EMAIL_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to EMAIL_CONSOLIDATION_DELAY
|
|
const CONSOLIDATION_KEY_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation)
|
|
const LOCK_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration)
|
|
const RATE_LIMITER_DURATION = EMAIL_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting)
|
|
const NOTIFICATION_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (matches consolidation key expiration)
|
|
|
|
let emailAddQueue;
|
|
let emailConsolidateQueue;
|
|
let emailAddWorker;
|
|
let emailConsolidateWorker;
|
|
|
|
/**
|
|
* Initializes the email notification queues and workers.
|
|
*
|
|
* @param {Object} options - Configuration options for queue initialization.
|
|
* @param {Object} options.pubClient - Redis client instance for queue communication.
|
|
* @param {Object} options.logger - Logger instance for logging events and debugging.
|
|
* @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications.
|
|
*/
|
|
const loadEmailQueue = async ({ pubClient, logger }) => {
|
|
if (!emailAddQueue || !emailConsolidateQueue) {
|
|
logger.logger.info("Initializing Email Notification Queues");
|
|
|
|
// Queue for adding email notifications
|
|
emailAddQueue = new Queue("emailAdd", {
|
|
connection: pubClient,
|
|
prefix: "{BULLMQ}",
|
|
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
|
});
|
|
|
|
// Queue for consolidating and sending emails
|
|
emailConsolidateQueue = new Queue("emailConsolidate", {
|
|
connection: pubClient,
|
|
prefix: "{BULLMQ}",
|
|
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
|
});
|
|
|
|
// Worker to process adding notifications
|
|
emailAddWorker = new Worker(
|
|
"emailAdd",
|
|
async (job) => {
|
|
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
|
|
logger.logger.info(`Adding email notifications for jobId ${jobId}`);
|
|
|
|
const redisKeyPrefix = `email:notifications:${jobId}`;
|
|
for (const recipient of recipients) {
|
|
const { user, firstName, lastName } = recipient;
|
|
const userKey = `${redisKeyPrefix}:${user}`;
|
|
await pubClient.rpush(userKey, body);
|
|
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
|
|
const detailsKey = `email:recipientDetails:${jobId}:${user}`;
|
|
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
|
|
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
|
|
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
|
|
await pubClient.sadd(`email:recipients:${jobId}`, user);
|
|
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
|
|
}
|
|
|
|
const consolidateKey = `email:consolidate:${jobId}`;
|
|
const flagSet = await pubClient.setnx(consolidateKey, "pending");
|
|
if (flagSet) {
|
|
await emailConsolidateQueue.add(
|
|
"consolidate-emails",
|
|
{ jobId, jobRoNumber, bodyShopName },
|
|
{
|
|
jobId: `consolidate:${jobId}`,
|
|
delay: EMAIL_CONSOLIDATION_DELAY,
|
|
attempts: 3,
|
|
backoff: LOCK_EXPIRATION
|
|
}
|
|
);
|
|
logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`);
|
|
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
|
|
} else {
|
|
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
|
|
}
|
|
},
|
|
{
|
|
connection: pubClient,
|
|
prefix: "{BULLMQ}",
|
|
concurrency: 5
|
|
}
|
|
);
|
|
|
|
// Worker to consolidate and send emails
|
|
emailConsolidateWorker = new Worker(
|
|
"emailConsolidate",
|
|
async (job) => {
|
|
const { jobId, jobRoNumber, bodyShopName } = job.data;
|
|
logger.logger.info(`Consolidating emails for jobId ${jobId}`);
|
|
|
|
const lockKey = `lock:emailConsolidate:${jobId}`;
|
|
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
|
|
if (lockAcquired) {
|
|
try {
|
|
const recipientsSet = `email:recipients:${jobId}`;
|
|
const recipients = await pubClient.smembers(recipientsSet);
|
|
for (const recipient of recipients) {
|
|
const userKey = `email:notifications:${jobId}:${recipient}`;
|
|
const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
|
|
const messages = await pubClient.lrange(userKey, 0, -1);
|
|
if (messages.length > 0) {
|
|
const details = await pubClient.hgetall(detailsKey);
|
|
const firstName = details.firstName || "User";
|
|
const multipleUpdateString = messages.length > 1 ? "Updates" : "Update";
|
|
const subject = `${multipleUpdateString} for job ${jobRoNumber} at ${bodyShopName}`;
|
|
const emailBody = generateEmailTemplate({
|
|
header: `${multipleUpdateString} for Job ${jobRoNumber}`,
|
|
subHeader: `Dear ${firstName},`,
|
|
body: `
|
|
<p>There have been updates to job ${jobRoNumber} at ${bodyShopName}:</p><br/>
|
|
<ul>
|
|
${messages.map((msg) => `<li>${msg}</li>`).join("")}
|
|
</ul><br/><br/>
|
|
<p><a href="${InstanceEndpoints()}/manage/jobs/${jobId}">Please check the job for more details.</a></p>
|
|
`
|
|
});
|
|
await sendTaskEmail({
|
|
to: recipient,
|
|
subject,
|
|
type: "html",
|
|
html: emailBody
|
|
});
|
|
logger.logger.info(
|
|
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
|
|
);
|
|
await pubClient.del(userKey);
|
|
await pubClient.del(detailsKey);
|
|
}
|
|
}
|
|
await pubClient.del(recipientsSet);
|
|
await pubClient.del(`email:consolidate:${jobId}`);
|
|
} catch (err) {
|
|
logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
|
|
throw err;
|
|
} finally {
|
|
await pubClient.del(lockKey);
|
|
}
|
|
} else {
|
|
logger.logger.info(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
|
|
}
|
|
},
|
|
{
|
|
connection: pubClient,
|
|
prefix: "{BULLMQ}",
|
|
concurrency: 1,
|
|
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
|
|
}
|
|
);
|
|
|
|
// Event handlers for workers
|
|
emailAddWorker.on("completed", (job) => logger.logger.info(`Email add job ${job.id} completed`));
|
|
emailConsolidateWorker.on("completed", (job) => logger.logger.info(`Email consolidate job ${job.id} completed`));
|
|
emailAddWorker.on("failed", (job, err) =>
|
|
logger.logger.error(`Email add job ${job.id} failed: ${err.message}`, { error: err })
|
|
);
|
|
emailConsolidateWorker.on("failed", (job, err) =>
|
|
logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err })
|
|
);
|
|
|
|
// Register cleanup task instead of direct process listeners
|
|
const shutdown = async () => {
|
|
logger.logger.info("Closing email queue workers...");
|
|
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
|
|
logger.logger.info("Email queue workers closed");
|
|
};
|
|
registerCleanupTask(shutdown);
|
|
}
|
|
|
|
return emailAddQueue;
|
|
};
|
|
|
|
/**
|
|
* Retrieves the initialized `emailAddQueue` instance.
|
|
*
|
|
* @returns {Queue} The `emailAddQueue` instance for adding notifications.
|
|
* @throws {Error} If `emailAddQueue` is not initialized.
|
|
*/
|
|
const getQueue = () => {
|
|
if (!emailAddQueue) {
|
|
throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
|
|
}
|
|
return emailAddQueue;
|
|
};
|
|
|
|
/**
|
|
* Dispatches email notifications to the `emailAddQueue` for processing.
|
|
*
|
|
* @param {Object} options - Options for dispatching notifications.
|
|
* @param {Array} options.emailsToDispatch - Array of email notification objects.
|
|
* @param {Object} options.logger - Logger instance for logging dispatch events.
|
|
* @returns {Promise<void>} Resolves when all notifications are added to the queue.
|
|
*/
|
|
const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
|
|
const emailAddQueue = getQueue();
|
|
|
|
for (const email of emailsToDispatch) {
|
|
const { jobId, jobRoNumber, bodyShopName, body, recipients } = email;
|
|
|
|
if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) {
|
|
logger.logger.warn(
|
|
`Skipping email dispatch for jobId ${jobId} due to missing data: ` +
|
|
`jobRoNumber=${jobRoNumber}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}`
|
|
);
|
|
continue;
|
|
}
|
|
|
|
await emailAddQueue.add(
|
|
"add-email-notification",
|
|
{ jobId, jobRoNumber, bodyShopName, body, recipients },
|
|
{ jobId: `${jobId}:${Date.now()}` }
|
|
);
|
|
logger.logger.info(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
|
}
|
|
};
|
|
|
|
module.exports = { loadEmailQueue, getQueue, dispatchEmailsToQueue };
|