diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js
index 6ca97d3c8..516991b1b 100644
--- a/server/notifications/queues/emailQueue.js
+++ b/server/notifications/queues/emailQueue.js
@@ -1,132 +1,216 @@
const { Queue, Worker } = require("bullmq");
const { sendTaskEmail } = require("../../email/sendemail");
-let emailQueue;
-let worker;
-
-// Consolidate the same way the App Queue Does.
+let emailAddQueue;
+let emailConsolidateQueue;
+let emailAddWorker;
+let emailConsolidateWorker;
/**
- * Initializes the email queue and worker for sending notifications via email.
+ * Initializes the email notification queues and workers.
*
* @param {Object} options - Configuration options for queue initialization.
* @param {Object} options.pubClient - Redis client instance for queue communication.
* @param {Object} options.logger - Logger instance for logging events and debugging.
- * @returns {Queue} The initialized `emailQueue` instance for dispatching emails.
+ * @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications.
*/
const loadEmailQueue = async ({ pubClient, logger }) => {
- // Only initialize if queue doesn't already exist
- if (!emailQueue) {
- logger.logger.info("Initializing Notifications Email Queue");
+ if (!emailAddQueue || !emailConsolidateQueue) {
+ logger.logger.info("Initializing Email Notification Queues");
- // Create queue for email notifications
- emailQueue = new Queue("notificationsEmails", {
+ // Queue for adding email notifications
+ emailAddQueue = new Queue("emailAdd", {
connection: pubClient,
- prefix: "{BULLMQ}", // Namespace prefix for BullMQ in Redis
- defaultJobOptions: {
- attempts: 3, // Retry failed jobs up to 3 times
- backoff: {
- type: "exponential", // Exponential backoff strategy
- delay: 1000 // Initial delay of 1 second
- }
- }
+ prefix: "{BULLMQ}",
+ defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
- // Worker to process jobs from the emailQueue
- worker = new Worker(
- "notificationsEmails",
+ // Queue for consolidating and sending emails
+ emailConsolidateQueue = new Queue("emailConsolidate", {
+ connection: pubClient,
+ prefix: "{BULLMQ}",
+ defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
+ });
+
+ // Worker to process adding notifications
+ emailAddWorker = new Worker(
+ "emailAdd",
async (job) => {
- const { subject, body, recipient } = job.data;
- logger.logger.debug(`Processing email job ${job.id} for recipient ${recipient}`);
+ const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; // Receive bodyShopName
+ logger.logger.info(`Adding email notifications for jobId ${jobId}`);
- // Send email to a single recipient
- await sendTaskEmail({
- to: recipient, // Single email address
- subject,
- type: "text",
- text: body
- });
+ const redisKeyPrefix = `email:notifications:${jobId}`;
+ for (const recipient of recipients) {
+ const { user } = recipient;
+ const userKey = `${redisKeyPrefix}:${user}`;
+ await pubClient.rpush(userKey, body);
+ const detailsKey = `email:recipientDetails:${jobId}:${user}`;
+ await pubClient.hsetnx(detailsKey, "firstName", recipient.firstName || "");
+ await pubClient.hsetnx(detailsKey, "lastName", recipient.lastName || "");
+ await pubClient.sadd(`email:recipients:${jobId}`, user);
+ logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
+ }
- logger.logger.debug(`Email job ${job.id} processed successfully`);
+ const consolidateKey = `email:consolidate:${jobId}`;
+ const flagSet = await pubClient.setnx(consolidateKey, "pending");
+ if (flagSet) {
+ // Pass bodyShopName to the consolidation job
+ await emailConsolidateQueue.add(
+ "consolidate-emails",
+ { jobId, jobRoNumber, bodyShopName },
+ { jobId: `consolidate:${jobId}`, delay: 30000 }
+ );
+ logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`);
+ await pubClient.expire(consolidateKey, 300);
+ } else {
+ logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
+ }
},
{
connection: pubClient,
prefix: "{BULLMQ}",
- concurrency: 2, // Process up to 2 jobs concurrently
- limiter: {
- max: 10, // Maximum of 10 jobs per minute
- duration: 60 * 1000 // 1 minute
- }
+ concurrency: 5
}
);
- // Worker event handlers
- worker.on("completed", (job) => {
- logger.logger.debug(`Job ${job.id} completed`);
- });
+ // Worker to consolidate and send emails
+ emailConsolidateWorker = new Worker(
+ "emailConsolidate",
+ async (job) => {
+ const { jobId, jobRoNumber, bodyShopName } = job.data;
+ logger.logger.info(`Consolidating emails for jobId ${jobId}`);
- worker.on("failed", (job, err) => {
- logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err });
- });
-
- worker.on("error", (err) => {
- logger.logger.error("Worker error:", { error: err });
- });
-
- // Graceful shutdown handler for the worker
- const shutdown = async () => {
- if (worker) {
- logger.logger.info("Closing email queue worker...");
- await worker.close();
- logger.logger.info("Email queue worker closed");
+ const lockKey = `lock:emailConsolidate:${jobId}`;
+ const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10);
+ if (lockAcquired) {
+ try {
+ const recipientsSet = `email:recipients:${jobId}`;
+ const recipients = await pubClient.smembers(recipientsSet);
+ for (const recipient of recipients) {
+ const userKey = `email:notifications:${jobId}:${recipient}`;
+ const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
+ const messages = await pubClient.lrange(userKey, 0, -1);
+ if (messages.length > 0) {
+ const details = await pubClient.hgetall(detailsKey);
+ const firstName = details.firstName || "User";
+ const subject = `Updates for job ${jobRoNumber} at ${bodyShopName}`;
+ const body = [
+ '
',
+ "Dear " + firstName + ",",
+ "",
+ "There have been updates to job " + jobRoNumber + ":",
+ "",
+ "",
+ ...messages.map((msg) => " - " + msg + "
"),
+ "
",
+ "",
+ "Please check the job for more details.",
+ "",
+ "Best regards,",
+ bodyShopName,
+ ""
+ ].join("\n");
+ await sendTaskEmail({
+ to: recipient,
+ subject,
+ type: "html",
+ html: body
+ });
+ logger.logger.info(
+ `Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
+ );
+ await pubClient.del(userKey);
+ await pubClient.del(detailsKey);
+ }
+ }
+ await pubClient.del(recipientsSet);
+ await pubClient.del(`email:consolidate:${jobId}`);
+ } catch (err) {
+ logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
+ throw err;
+ } finally {
+ await pubClient.del(lockKey);
+ }
+ } else {
+ logger.logger.info(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
+ }
+ },
+ {
+ connection: pubClient,
+ prefix: "{BULLMQ}",
+ concurrency: 1,
+ limiter: { max: 1, duration: 5000 }
}
- };
+ );
- process.on("SIGTERM", shutdown); // Handle termination signal
- process.on("SIGINT", shutdown); // Handle interrupt signal (e.g., Ctrl+C)
+ // Event handlers for workers
+ emailAddWorker.on("completed", (job) => logger.logger.info(`Email add job ${job.id} completed`));
+ emailConsolidateWorker.on("completed", (job) => logger.logger.info(`Email consolidate job ${job.id} completed`));
+ emailAddWorker.on("failed", (job, err) =>
+ logger.logger.error(`Email add job ${job.id} failed: ${err.message}`, { error: err })
+ );
+ emailConsolidateWorker.on("failed", (job, err) =>
+ logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err })
+ );
+
+ // Graceful shutdown
+ const shutdown = async () => {
+ logger.logger.info("Closing email queue workers...");
+ await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
+ logger.logger.info("Email queue workers closed");
+ };
+ process.on("SIGTERM", shutdown);
+ process.on("SIGINT", shutdown);
}
- return emailQueue; // Return queue for external use
+ return emailAddQueue;
};
/**
- * Retrieves the initialized `emailQueue` instance.
+ * Retrieves the initialized `emailAddQueue` instance.
*
- * @returns {Queue} The `emailQueue` instance for sending emails.
- * @throws {Error} If `emailQueue` is not initialized (i.e., `loadEmailQueue` wasn’t called).
+ * @returns {Queue} The `emailAddQueue` instance for adding notifications.
+ * @throws {Error} If `emailAddQueue` is not initialized.
*/
const getQueue = () => {
- if (!emailQueue) {
- throw new Error("Email queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
+ if (!emailAddQueue) {
+ throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
}
- return emailQueue;
+ return emailAddQueue;
};
/**
- * Dispatches emails to the `emailQueue` for processing, creating one job per recipient.
+ * Dispatches email notifications to the `emailAddQueue` for processing.
*
- * @param {Object} options - Options for dispatching emails.
- * @param {Array} options.emailsToDispatch - Array of email objects to dispatch.
+ * @param {Object} options - Options for dispatching notifications.
+ * @param {Array} options.emailsToDispatch - Array of email notification objects.
* @param {Object} options.logger - Logger instance for logging dispatch events.
- * @returns {Promise} Resolves when all email jobs are added to the queue.
+ * @returns {Promise} Resolves when all notifications are added to the queue.
*/
const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
- const emailQueue = getQueue();
+ console.dir(emailsToDispatch);
+ const emailAddQueue = getQueue();
for (const email of emailsToDispatch) {
- const { subject, body, recipients } = email;
- // Create an array of jobs, one per recipient
- const jobs = recipients.map((recipient) => ({
- name: "send-email",
- data: {
- subject,
- body,
- recipient: recipient.user // Extract the email address from recipient object
- }
- }));
- // Add all jobs for this email in one operation
- await emailQueue.addBulk(jobs);
- logger.logger.debug(`Added ${jobs.length} email jobs to queue for subject: ${subject}`);
+ // Extract bodyShopName along with other fields
+ const { jobId, jobRoNumber, bodyShopName, body, recipients } = email;
+
+ // Validate required fields, including bodyShopName
+ if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) {
+ logger.logger.warn(
+ `Skipping email dispatch for jobId ${jobId} due to missing data: ` +
+ `jobRoNumber=${jobRoNumber}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}`
+ );
+ continue;
+ }
+
+ // Include bodyShopName in the job data
+ await emailAddQueue.add(
+ "add-email-notification",
+ { jobId, jobRoNumber, bodyShopName, body, recipients },
+ { jobId: `${jobId}:${Date.now()}` }
+ );
+ logger.logger.info(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js
index 189c51928..6ad5e00af 100644
--- a/server/notifications/scenarioBuilders.js
+++ b/server/notifications/scenarioBuilders.js
@@ -8,13 +8,13 @@ const { getJobAssignmentType } = require("./stringHelpers");
*/
const populateWatchers = (data, result) => {
data.scenarioWatchers.forEach((recipients) => {
- const { user, app, fcm, email } = recipients;
+ const { user, app, fcm, email, firstName, lastName } = recipients;
// Add user to app recipients with bodyShopId if app notification is enabled
if (app === true) result.app.recipients.push({ user, bodyShopId: data.bodyShopId });
// Add user to FCM recipients if FCM notification is enabled
if (fcm === true) result.fcm.recipients.push(user);
// Add user to email recipients if email notification is enabled
- if (email === true) result.email.recipients.push({ user });
+ if (email === true) result.email.recipients.push({ user, firstName, lastName });
});
};
@@ -37,8 +37,11 @@ const alternateTransportChangedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Alternate transport for ${data?.jobRoNumber} (${data.bodyShopName}) changed to ${data.data.alt_transport || "None"}`,
- body: `The alternate transport status has been updated for job ${data?.jobRoNumber} in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ // subject: `Alternate transport for ${data?.jobRoNumber} (${data.bodyShopName}) changed to ${data.data.alt_transport || "None"}`,
+ body: `The alternate transport status has been updated.`,
recipients: []
},
fcm: { recipients: [] }
@@ -66,8 +69,11 @@ const billPostedHandler = (data) => {
recipients: []
},
email: {
- subject: `Bill posted for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `A bill of $${data.data.clm_total} has been posted for job ${data?.jobRoNumber} in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ // subject: `Bill posted for ${data?.jobRoNumber} (${data.bodyShopName})`,
+ body: `A bill of $${data.data.clm_total} has been posted.`,
recipients: []
},
fcm: { recipients: [] }
@@ -96,8 +102,10 @@ const criticalPartsStatusChangedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Critical parts status for ${data?.jobRoNumber} (${data.bodyShopName}) updated`,
- body: `The critical parts status for job ${data?.jobRoNumber} in ${data.bodyShopName} has changed to ${data.data.queued_for_parts ? "queued" : "not queued"}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `The critical parts status has changed to ${data.data.queued_for_parts ? "queued" : "not queued"}.`,
recipients: []
},
fcm: { recipients: [] }
@@ -128,8 +136,10 @@ const intakeDeliveryChecklistCompletedBuilder = (data) => {
recipients: []
},
email: {
- subject: `${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist completed for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `The ${checklistType} checklist for job ${data?.jobRoNumber} in ${data.bodyShopName} has been completed.`,
+ jobRoNumber: data.jobRoNumber,
+ jobId: data.jobId,
+ bodyShopName: data.bodyShopName,
+ body: `The ${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist has been completed.`,
recipients: []
},
fcm: { recipients: [] }
@@ -157,8 +167,10 @@ const jobAssignedToMeBuilder = (data) => {
recipients: []
},
email: {
- subject: `You have been assigned to [${getJobAssignmentType(data.scenarioFields?.[0])}] on ${data?.jobRoNumber} in ${data.bodyShopName}`,
- body: `Hello, a new job has been assigned to you in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `You have been assigned to [${getJobAssignmentType(data.scenarioFields?.[0])}]`,
recipients: []
},
fcm: { recipients: [] }
@@ -184,8 +196,10 @@ const jobsAddedToProductionBuilder = (data) => {
recipients: []
},
email: {
- subject: `Job ${data?.jobRoNumber} (${data.bodyShopName}) added to production`,
- body: `Job ${data?.jobRoNumber} in ${data.bodyShopName} has been added to production.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `Job has been added to production.`,
recipients: []
},
fcm: { recipients: [] }
@@ -214,8 +228,10 @@ const jobStatusChangeBuilder = (data) => {
recipients: []
},
email: {
- subject: `The status of ${data?.jobRoNumber} (${data.bodyShopName}) has changed from ${data.changedFields.status.old} to ${data.data.status}`,
- body: `...`, // Placeholder indicating email body may need further customization
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `The status has changed from ${data.changedFields.status.old} to ${data.data.status}`,
recipients: []
},
fcm: { recipients: [] }
@@ -241,8 +257,10 @@ const newMediaAddedReassignedBuilder = (data) => {
recipients: []
},
email: {
- subject: `New media added to ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `New media has been added to job ${data?.jobRoNumber} in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `New media has been added.`,
recipients: []
},
fcm: { recipients: [] }
@@ -270,8 +288,10 @@ const newNoteAddedBuilder = (data) => {
recipients: []
},
email: {
- subject: `New note added to ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `A new note has been added to job ${data?.jobRoNumber} in ${data.bodyShopName}: "${data.data.text}"`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `A new note has been added: "${data.data.text}"`,
recipients: []
},
fcm: { recipients: [] }
@@ -297,8 +317,10 @@ const newTimeTicketPostedBuilder = (data) => {
recipients: []
},
email: {
- subject: `New time ticket posted for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `A new time ticket has been posted for job ${data?.jobRoNumber} in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `A new time ticket has been posted.`,
recipients: []
},
fcm: { recipients: [] }
@@ -327,8 +349,11 @@ const partMarkedBackOrderedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Part marked back-ordered for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `A part for job ${data?.jobRoNumber} in ${data.bodyShopName} has been marked as back-ordered.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ // subject: `Part marked back-ordered for ${data?.jobRoNumber} (${data.bodyShopName})`,
+ body: `A part has been marked as back-ordered.`,
recipients: []
},
fcm: { recipients: [] }
@@ -356,8 +381,10 @@ const paymentCollectedCompletedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Payment collected for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `Payment of $${data.data.clm_total} has been collected for job ${data?.jobRoNumber} in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `Payment of $${data.data.clm_total} has been collected.`,
recipients: []
},
fcm: { recipients: [] }
@@ -390,8 +417,11 @@ const scheduledDatesChangedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Scheduled dates updated for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `Scheduled dates for job ${data?.jobRoNumber} in ${data.bodyShopName} have been updated.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ // subject: `Scheduled dates updated for ${data?.jobRoNumber} (${data.bodyShopName})`,
+ body: `Scheduled dates have been updated.`,
recipients: []
},
fcm: { recipients: [] }
@@ -419,8 +449,10 @@ const supplementImportedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Supplement imported for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `A supplement of $${data.data.cieca_ttl?.data?.supp_amt || 0} has been imported for job ${data?.jobRoNumber} in ${data.bodyShopName}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `A supplement of $${data.data.cieca_ttl?.data?.supp_amt || 0} has been imported.`,
recipients: []
},
fcm: { recipients: [] }
@@ -449,8 +481,10 @@ const tasksUpdatedCreatedBuilder = (data) => {
recipients: []
},
email: {
- subject: `Tasks ${data.isNew ? "created" : "updated"} for ${data?.jobRoNumber} (${data.bodyShopName})`,
- body: `Tasks for job ${data?.jobRoNumber} in ${data.bodyShopName} have been ${data.isNew ? "created" : "updated"}.`,
+ jobId: data.jobId,
+ jobRoNumber: data.jobRoNumber,
+ bodyShopName: data.bodyShopName,
+ body: `Tasks have been ${data.isNew ? "created" : "updated"}.`,
recipients: []
},
fcm: { recipients: [] }