feature/IO-3096-GlobalNotifications - Email Queue now batches per job per user
This commit is contained in:
@@ -1,132 +1,216 @@
|
||||
const { Queue, Worker } = require("bullmq");
|
||||
const { sendTaskEmail } = require("../../email/sendemail");
|
||||
|
||||
let emailQueue;
|
||||
let worker;
|
||||
|
||||
// Consolidate the same way the App Queue Does.
|
||||
let emailAddQueue;
|
||||
let emailConsolidateQueue;
|
||||
let emailAddWorker;
|
||||
let emailConsolidateWorker;
|
||||
|
||||
/**
|
||||
* Initializes the email queue and worker for sending notifications via email.
|
||||
* Initializes the email notification queues and workers.
|
||||
*
|
||||
* @param {Object} options - Configuration options for queue initialization.
|
||||
* @param {Object} options.pubClient - Redis client instance for queue communication.
|
||||
* @param {Object} options.logger - Logger instance for logging events and debugging.
|
||||
* @returns {Queue} The initialized `emailQueue` instance for dispatching emails.
|
||||
* @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications.
|
||||
*/
|
||||
const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
// Only initialize if queue doesn't already exist
|
||||
if (!emailQueue) {
|
||||
logger.logger.info("Initializing Notifications Email Queue");
|
||||
if (!emailAddQueue || !emailConsolidateQueue) {
|
||||
logger.logger.info("Initializing Email Notification Queues");
|
||||
|
||||
// Create queue for email notifications
|
||||
emailQueue = new Queue("notificationsEmails", {
|
||||
// Queue for adding email notifications
|
||||
emailAddQueue = new Queue("emailAdd", {
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}", // Namespace prefix for BullMQ in Redis
|
||||
defaultJobOptions: {
|
||||
attempts: 3, // Retry failed jobs up to 3 times
|
||||
backoff: {
|
||||
type: "exponential", // Exponential backoff strategy
|
||||
delay: 1000 // Initial delay of 1 second
|
||||
}
|
||||
}
|
||||
prefix: "{BULLMQ}",
|
||||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||||
});
|
||||
|
||||
// Worker to process jobs from the emailQueue
|
||||
worker = new Worker(
|
||||
"notificationsEmails",
|
||||
// Queue for consolidating and sending emails
|
||||
emailConsolidateQueue = new Queue("emailConsolidate", {
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||||
});
|
||||
|
||||
// Worker to process adding notifications
|
||||
emailAddWorker = new Worker(
|
||||
"emailAdd",
|
||||
async (job) => {
|
||||
const { subject, body, recipient } = job.data;
|
||||
logger.logger.debug(`Processing email job ${job.id} for recipient ${recipient}`);
|
||||
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; // Receive bodyShopName
|
||||
logger.logger.info(`Adding email notifications for jobId ${jobId}`);
|
||||
|
||||
// Send email to a single recipient
|
||||
await sendTaskEmail({
|
||||
to: recipient, // Single email address
|
||||
subject,
|
||||
type: "text",
|
||||
text: body
|
||||
});
|
||||
const redisKeyPrefix = `email:notifications:${jobId}`;
|
||||
for (const recipient of recipients) {
|
||||
const { user } = recipient;
|
||||
const userKey = `${redisKeyPrefix}:${user}`;
|
||||
await pubClient.rpush(userKey, body);
|
||||
const detailsKey = `email:recipientDetails:${jobId}:${user}`;
|
||||
await pubClient.hsetnx(detailsKey, "firstName", recipient.firstName || "");
|
||||
await pubClient.hsetnx(detailsKey, "lastName", recipient.lastName || "");
|
||||
await pubClient.sadd(`email:recipients:${jobId}`, user);
|
||||
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
|
||||
}
|
||||
|
||||
logger.logger.debug(`Email job ${job.id} processed successfully`);
|
||||
const consolidateKey = `email:consolidate:${jobId}`;
|
||||
const flagSet = await pubClient.setnx(consolidateKey, "pending");
|
||||
if (flagSet) {
|
||||
// Pass bodyShopName to the consolidation job
|
||||
await emailConsolidateQueue.add(
|
||||
"consolidate-emails",
|
||||
{ jobId, jobRoNumber, bodyShopName },
|
||||
{ jobId: `consolidate:${jobId}`, delay: 30000 }
|
||||
);
|
||||
logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`);
|
||||
await pubClient.expire(consolidateKey, 300);
|
||||
} else {
|
||||
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
concurrency: 2, // Process up to 2 jobs concurrently
|
||||
limiter: {
|
||||
max: 10, // Maximum of 10 jobs per minute
|
||||
duration: 60 * 1000 // 1 minute
|
||||
}
|
||||
concurrency: 5
|
||||
}
|
||||
);
|
||||
|
||||
// Worker event handlers
|
||||
worker.on("completed", (job) => {
|
||||
logger.logger.debug(`Job ${job.id} completed`);
|
||||
});
|
||||
// Worker to consolidate and send emails
|
||||
emailConsolidateWorker = new Worker(
|
||||
"emailConsolidate",
|
||||
async (job) => {
|
||||
const { jobId, jobRoNumber, bodyShopName } = job.data;
|
||||
logger.logger.info(`Consolidating emails for jobId ${jobId}`);
|
||||
|
||||
worker.on("failed", (job, err) => {
|
||||
logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err });
|
||||
});
|
||||
|
||||
worker.on("error", (err) => {
|
||||
logger.logger.error("Worker error:", { error: err });
|
||||
});
|
||||
|
||||
// Graceful shutdown handler for the worker
|
||||
const shutdown = async () => {
|
||||
if (worker) {
|
||||
logger.logger.info("Closing email queue worker...");
|
||||
await worker.close();
|
||||
logger.logger.info("Email queue worker closed");
|
||||
const lockKey = `lock:emailConsolidate:${jobId}`;
|
||||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10);
|
||||
if (lockAcquired) {
|
||||
try {
|
||||
const recipientsSet = `email:recipients:${jobId}`;
|
||||
const recipients = await pubClient.smembers(recipientsSet);
|
||||
for (const recipient of recipients) {
|
||||
const userKey = `email:notifications:${jobId}:${recipient}`;
|
||||
const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
|
||||
const messages = await pubClient.lrange(userKey, 0, -1);
|
||||
if (messages.length > 0) {
|
||||
const details = await pubClient.hgetall(detailsKey);
|
||||
const firstName = details.firstName || "User";
|
||||
const subject = `Updates for job ${jobRoNumber} at ${bodyShopName}`;
|
||||
const body = [
|
||||
'<html lang="en"><body>',
|
||||
"Dear " + firstName + ",",
|
||||
"",
|
||||
"There have been updates to job " + jobRoNumber + ":",
|
||||
"",
|
||||
"<ul>",
|
||||
...messages.map((msg) => " <li>" + msg + "</li>"),
|
||||
"</ul>",
|
||||
"",
|
||||
"Please check the job for more details.",
|
||||
"",
|
||||
"Best regards,",
|
||||
bodyShopName,
|
||||
"</body></html>"
|
||||
].join("\n");
|
||||
await sendTaskEmail({
|
||||
to: recipient,
|
||||
subject,
|
||||
type: "html",
|
||||
html: body
|
||||
});
|
||||
logger.logger.info(
|
||||
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
|
||||
);
|
||||
await pubClient.del(userKey);
|
||||
await pubClient.del(detailsKey);
|
||||
}
|
||||
}
|
||||
await pubClient.del(recipientsSet);
|
||||
await pubClient.del(`email:consolidate:${jobId}`);
|
||||
} catch (err) {
|
||||
logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
|
||||
throw err;
|
||||
} finally {
|
||||
await pubClient.del(lockKey);
|
||||
}
|
||||
} else {
|
||||
logger.logger.info(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
concurrency: 1,
|
||||
limiter: { max: 1, duration: 5000 }
|
||||
}
|
||||
};
|
||||
);
|
||||
|
||||
process.on("SIGTERM", shutdown); // Handle termination signal
|
||||
process.on("SIGINT", shutdown); // Handle interrupt signal (e.g., Ctrl+C)
|
||||
// Event handlers for workers
|
||||
emailAddWorker.on("completed", (job) => logger.logger.info(`Email add job ${job.id} completed`));
|
||||
emailConsolidateWorker.on("completed", (job) => logger.logger.info(`Email consolidate job ${job.id} completed`));
|
||||
emailAddWorker.on("failed", (job, err) =>
|
||||
logger.logger.error(`Email add job ${job.id} failed: ${err.message}`, { error: err })
|
||||
);
|
||||
emailConsolidateWorker.on("failed", (job, err) =>
|
||||
logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err })
|
||||
);
|
||||
|
||||
// Graceful shutdown
|
||||
const shutdown = async () => {
|
||||
logger.logger.info("Closing email queue workers...");
|
||||
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
|
||||
logger.logger.info("Email queue workers closed");
|
||||
};
|
||||
process.on("SIGTERM", shutdown);
|
||||
process.on("SIGINT", shutdown);
|
||||
}
|
||||
|
||||
return emailQueue; // Return queue for external use
|
||||
return emailAddQueue;
|
||||
};
|
||||
|
||||
/**
|
||||
* Retrieves the initialized `emailQueue` instance.
|
||||
* Retrieves the initialized `emailAddQueue` instance.
|
||||
*
|
||||
* @returns {Queue} The `emailQueue` instance for sending emails.
|
||||
* @throws {Error} If `emailQueue` is not initialized (i.e., `loadEmailQueue` wasn’t called).
|
||||
* @returns {Queue} The `emailAddQueue` instance for adding notifications.
|
||||
* @throws {Error} If `emailAddQueue` is not initialized.
|
||||
*/
|
||||
const getQueue = () => {
|
||||
if (!emailQueue) {
|
||||
throw new Error("Email queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
|
||||
if (!emailAddQueue) {
|
||||
throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
|
||||
}
|
||||
return emailQueue;
|
||||
return emailAddQueue;
|
||||
};
|
||||
|
||||
/**
|
||||
* Dispatches emails to the `emailQueue` for processing, creating one job per recipient.
|
||||
* Dispatches email notifications to the `emailAddQueue` for processing.
|
||||
*
|
||||
* @param {Object} options - Options for dispatching emails.
|
||||
* @param {Array} options.emailsToDispatch - Array of email objects to dispatch.
|
||||
* @param {Object} options - Options for dispatching notifications.
|
||||
* @param {Array} options.emailsToDispatch - Array of email notification objects.
|
||||
* @param {Object} options.logger - Logger instance for logging dispatch events.
|
||||
* @returns {Promise<void>} Resolves when all email jobs are added to the queue.
|
||||
* @returns {Promise<void>} Resolves when all notifications are added to the queue.
|
||||
*/
|
||||
const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
|
||||
const emailQueue = getQueue();
|
||||
console.dir(emailsToDispatch);
|
||||
const emailAddQueue = getQueue();
|
||||
|
||||
for (const email of emailsToDispatch) {
|
||||
const { subject, body, recipients } = email;
|
||||
// Create an array of jobs, one per recipient
|
||||
const jobs = recipients.map((recipient) => ({
|
||||
name: "send-email",
|
||||
data: {
|
||||
subject,
|
||||
body,
|
||||
recipient: recipient.user // Extract the email address from recipient object
|
||||
}
|
||||
}));
|
||||
// Add all jobs for this email in one operation
|
||||
await emailQueue.addBulk(jobs);
|
||||
logger.logger.debug(`Added ${jobs.length} email jobs to queue for subject: ${subject}`);
|
||||
// Extract bodyShopName along with other fields
|
||||
const { jobId, jobRoNumber, bodyShopName, body, recipients } = email;
|
||||
|
||||
// Validate required fields, including bodyShopName
|
||||
if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) {
|
||||
logger.logger.warn(
|
||||
`Skipping email dispatch for jobId ${jobId} due to missing data: ` +
|
||||
`jobRoNumber=${jobRoNumber}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Include bodyShopName in the job data
|
||||
await emailAddQueue.add(
|
||||
"add-email-notification",
|
||||
{ jobId, jobRoNumber, bodyShopName, body, recipients },
|
||||
{ jobId: `${jobId}:${Date.now()}` }
|
||||
);
|
||||
logger.logger.info(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user