From 29f7144e7202729e76a16063f494e3e99ab34ee7 Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Wed, 19 Feb 2025 16:10:53 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications - Email Queue now batches per job per user --- server/notifications/queues/emailQueue.js | 250 +++++++++++++++------- server/notifications/scenarioBuilders.js | 98 ++++++--- 2 files changed, 233 insertions(+), 115 deletions(-) diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index 6ca97d3c8..516991b1b 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -1,132 +1,216 @@ const { Queue, Worker } = require("bullmq"); const { sendTaskEmail } = require("../../email/sendemail"); -let emailQueue; -let worker; - -// Consolidate the same way the App Queue Does. +let emailAddQueue; +let emailConsolidateQueue; +let emailAddWorker; +let emailConsolidateWorker; /** - * Initializes the email queue and worker for sending notifications via email. + * Initializes the email notification queues and workers. * * @param {Object} options - Configuration options for queue initialization. * @param {Object} options.pubClient - Redis client instance for queue communication. * @param {Object} options.logger - Logger instance for logging events and debugging. - * @returns {Queue} The initialized `emailQueue` instance for dispatching emails. + * @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications. */ const loadEmailQueue = async ({ pubClient, logger }) => { - // Only initialize if queue doesn't already exist - if (!emailQueue) { - logger.logger.info("Initializing Notifications Email Queue"); + if (!emailAddQueue || !emailConsolidateQueue) { + logger.logger.info("Initializing Email Notification Queues"); - // Create queue for email notifications - emailQueue = new Queue("notificationsEmails", { + // Queue for adding email notifications + emailAddQueue = new Queue("emailAdd", { connection: pubClient, - prefix: "{BULLMQ}", // Namespace prefix for BullMQ in Redis - defaultJobOptions: { - attempts: 3, // Retry failed jobs up to 3 times - backoff: { - type: "exponential", // Exponential backoff strategy - delay: 1000 // Initial delay of 1 second - } - } + prefix: "{BULLMQ}", + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); - // Worker to process jobs from the emailQueue - worker = new Worker( - "notificationsEmails", + // Queue for consolidating and sending emails + emailConsolidateQueue = new Queue("emailConsolidate", { + connection: pubClient, + prefix: "{BULLMQ}", + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } + }); + + // Worker to process adding notifications + emailAddWorker = new Worker( + "emailAdd", async (job) => { - const { subject, body, recipient } = job.data; - logger.logger.debug(`Processing email job ${job.id} for recipient ${recipient}`); + const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; // Receive bodyShopName + logger.logger.info(`Adding email notifications for jobId ${jobId}`); - // Send email to a single recipient - await sendTaskEmail({ - to: recipient, // Single email address - subject, - type: "text", - text: body - }); + const redisKeyPrefix = `email:notifications:${jobId}`; + for (const recipient of recipients) { + const { user } = recipient; + const userKey = `${redisKeyPrefix}:${user}`; + await pubClient.rpush(userKey, body); + const detailsKey = `email:recipientDetails:${jobId}:${user}`; + await pubClient.hsetnx(detailsKey, "firstName", recipient.firstName || ""); + await pubClient.hsetnx(detailsKey, "lastName", recipient.lastName || ""); + await pubClient.sadd(`email:recipients:${jobId}`, user); + logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`); + } - logger.logger.debug(`Email job ${job.id} processed successfully`); + const consolidateKey = `email:consolidate:${jobId}`; + const flagSet = await pubClient.setnx(consolidateKey, "pending"); + if (flagSet) { + // Pass bodyShopName to the consolidation job + await emailConsolidateQueue.add( + "consolidate-emails", + { jobId, jobRoNumber, bodyShopName }, + { jobId: `consolidate:${jobId}`, delay: 30000 } + ); + logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`); + await pubClient.expire(consolidateKey, 300); + } else { + logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`); + } }, { connection: pubClient, prefix: "{BULLMQ}", - concurrency: 2, // Process up to 2 jobs concurrently - limiter: { - max: 10, // Maximum of 10 jobs per minute - duration: 60 * 1000 // 1 minute - } + concurrency: 5 } ); - // Worker event handlers - worker.on("completed", (job) => { - logger.logger.debug(`Job ${job.id} completed`); - }); + // Worker to consolidate and send emails + emailConsolidateWorker = new Worker( + "emailConsolidate", + async (job) => { + const { jobId, jobRoNumber, bodyShopName } = job.data; + logger.logger.info(`Consolidating emails for jobId ${jobId}`); - worker.on("failed", (job, err) => { - logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err }); - }); - - worker.on("error", (err) => { - logger.logger.error("Worker error:", { error: err }); - }); - - // Graceful shutdown handler for the worker - const shutdown = async () => { - if (worker) { - logger.logger.info("Closing email queue worker..."); - await worker.close(); - logger.logger.info("Email queue worker closed"); + const lockKey = `lock:emailConsolidate:${jobId}`; + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); + if (lockAcquired) { + try { + const recipientsSet = `email:recipients:${jobId}`; + const recipients = await pubClient.smembers(recipientsSet); + for (const recipient of recipients) { + const userKey = `email:notifications:${jobId}:${recipient}`; + const detailsKey = `email:recipientDetails:${jobId}:${recipient}`; + const messages = await pubClient.lrange(userKey, 0, -1); + if (messages.length > 0) { + const details = await pubClient.hgetall(detailsKey); + const firstName = details.firstName || "User"; + const subject = `Updates for job ${jobRoNumber} at ${bodyShopName}`; + const body = [ + '', + "Dear " + firstName + ",", + "", + "There have been updates to job " + jobRoNumber + ":", + "", + "", + "", + "Please check the job for more details.", + "", + "Best regards,", + bodyShopName, + "" + ].join("\n"); + await sendTaskEmail({ + to: recipient, + subject, + type: "html", + html: body + }); + logger.logger.info( + `Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates` + ); + await pubClient.del(userKey); + await pubClient.del(detailsKey); + } + } + await pubClient.del(recipientsSet); + await pubClient.del(`email:consolidate:${jobId}`); + } catch (err) { + logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); + throw err; + } finally { + await pubClient.del(lockKey); + } + } else { + logger.logger.info(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`); + } + }, + { + connection: pubClient, + prefix: "{BULLMQ}", + concurrency: 1, + limiter: { max: 1, duration: 5000 } } - }; + ); - process.on("SIGTERM", shutdown); // Handle termination signal - process.on("SIGINT", shutdown); // Handle interrupt signal (e.g., Ctrl+C) + // Event handlers for workers + emailAddWorker.on("completed", (job) => logger.logger.info(`Email add job ${job.id} completed`)); + emailConsolidateWorker.on("completed", (job) => logger.logger.info(`Email consolidate job ${job.id} completed`)); + emailAddWorker.on("failed", (job, err) => + logger.logger.error(`Email add job ${job.id} failed: ${err.message}`, { error: err }) + ); + emailConsolidateWorker.on("failed", (job, err) => + logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err }) + ); + + // Graceful shutdown + const shutdown = async () => { + logger.logger.info("Closing email queue workers..."); + await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]); + logger.logger.info("Email queue workers closed"); + }; + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); } - return emailQueue; // Return queue for external use + return emailAddQueue; }; /** - * Retrieves the initialized `emailQueue` instance. + * Retrieves the initialized `emailAddQueue` instance. * - * @returns {Queue} The `emailQueue` instance for sending emails. - * @throws {Error} If `emailQueue` is not initialized (i.e., `loadEmailQueue` wasn’t called). + * @returns {Queue} The `emailAddQueue` instance for adding notifications. + * @throws {Error} If `emailAddQueue` is not initialized. */ const getQueue = () => { - if (!emailQueue) { - throw new Error("Email queue not initialized. Ensure loadEmailQueue is called during bootstrap."); + if (!emailAddQueue) { + throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap."); } - return emailQueue; + return emailAddQueue; }; /** - * Dispatches emails to the `emailQueue` for processing, creating one job per recipient. + * Dispatches email notifications to the `emailAddQueue` for processing. * - * @param {Object} options - Options for dispatching emails. - * @param {Array} options.emailsToDispatch - Array of email objects to dispatch. + * @param {Object} options - Options for dispatching notifications. + * @param {Array} options.emailsToDispatch - Array of email notification objects. * @param {Object} options.logger - Logger instance for logging dispatch events. - * @returns {Promise} Resolves when all email jobs are added to the queue. + * @returns {Promise} Resolves when all notifications are added to the queue. */ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { - const emailQueue = getQueue(); + console.dir(emailsToDispatch); + const emailAddQueue = getQueue(); for (const email of emailsToDispatch) { - const { subject, body, recipients } = email; - // Create an array of jobs, one per recipient - const jobs = recipients.map((recipient) => ({ - name: "send-email", - data: { - subject, - body, - recipient: recipient.user // Extract the email address from recipient object - } - })); - // Add all jobs for this email in one operation - await emailQueue.addBulk(jobs); - logger.logger.debug(`Added ${jobs.length} email jobs to queue for subject: ${subject}`); + // Extract bodyShopName along with other fields + const { jobId, jobRoNumber, bodyShopName, body, recipients } = email; + + // Validate required fields, including bodyShopName + if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) { + logger.logger.warn( + `Skipping email dispatch for jobId ${jobId} due to missing data: ` + + `jobRoNumber=${jobRoNumber}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}` + ); + continue; + } + + // Include bodyShopName in the job data + await emailAddQueue.add( + "add-email-notification", + { jobId, jobRoNumber, bodyShopName, body, recipients }, + { jobId: `${jobId}:${Date.now()}` } + ); + logger.logger.info(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js index 189c51928..6ad5e00af 100644 --- a/server/notifications/scenarioBuilders.js +++ b/server/notifications/scenarioBuilders.js @@ -8,13 +8,13 @@ const { getJobAssignmentType } = require("./stringHelpers"); */ const populateWatchers = (data, result) => { data.scenarioWatchers.forEach((recipients) => { - const { user, app, fcm, email } = recipients; + const { user, app, fcm, email, firstName, lastName } = recipients; // Add user to app recipients with bodyShopId if app notification is enabled if (app === true) result.app.recipients.push({ user, bodyShopId: data.bodyShopId }); // Add user to FCM recipients if FCM notification is enabled if (fcm === true) result.fcm.recipients.push(user); // Add user to email recipients if email notification is enabled - if (email === true) result.email.recipients.push({ user }); + if (email === true) result.email.recipients.push({ user, firstName, lastName }); }); }; @@ -37,8 +37,11 @@ const alternateTransportChangedBuilder = (data) => { recipients: [] }, email: { - subject: `Alternate transport for ${data?.jobRoNumber} (${data.bodyShopName}) changed to ${data.data.alt_transport || "None"}`, - body: `The alternate transport status has been updated for job ${data?.jobRoNumber} in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + // subject: `Alternate transport for ${data?.jobRoNumber} (${data.bodyShopName}) changed to ${data.data.alt_transport || "None"}`, + body: `The alternate transport status has been updated.`, recipients: [] }, fcm: { recipients: [] } @@ -66,8 +69,11 @@ const billPostedHandler = (data) => { recipients: [] }, email: { - subject: `Bill posted for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `A bill of $${data.data.clm_total} has been posted for job ${data?.jobRoNumber} in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + // subject: `Bill posted for ${data?.jobRoNumber} (${data.bodyShopName})`, + body: `A bill of $${data.data.clm_total} has been posted.`, recipients: [] }, fcm: { recipients: [] } @@ -96,8 +102,10 @@ const criticalPartsStatusChangedBuilder = (data) => { recipients: [] }, email: { - subject: `Critical parts status for ${data?.jobRoNumber} (${data.bodyShopName}) updated`, - body: `The critical parts status for job ${data?.jobRoNumber} in ${data.bodyShopName} has changed to ${data.data.queued_for_parts ? "queued" : "not queued"}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `The critical parts status has changed to ${data.data.queued_for_parts ? "queued" : "not queued"}.`, recipients: [] }, fcm: { recipients: [] } @@ -128,8 +136,10 @@ const intakeDeliveryChecklistCompletedBuilder = (data) => { recipients: [] }, email: { - subject: `${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist completed for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `The ${checklistType} checklist for job ${data?.jobRoNumber} in ${data.bodyShopName} has been completed.`, + jobRoNumber: data.jobRoNumber, + jobId: data.jobId, + bodyShopName: data.bodyShopName, + body: `The ${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist has been completed.`, recipients: [] }, fcm: { recipients: [] } @@ -157,8 +167,10 @@ const jobAssignedToMeBuilder = (data) => { recipients: [] }, email: { - subject: `You have been assigned to [${getJobAssignmentType(data.scenarioFields?.[0])}] on ${data?.jobRoNumber} in ${data.bodyShopName}`, - body: `Hello, a new job has been assigned to you in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `You have been assigned to [${getJobAssignmentType(data.scenarioFields?.[0])}]`, recipients: [] }, fcm: { recipients: [] } @@ -184,8 +196,10 @@ const jobsAddedToProductionBuilder = (data) => { recipients: [] }, email: { - subject: `Job ${data?.jobRoNumber} (${data.bodyShopName}) added to production`, - body: `Job ${data?.jobRoNumber} in ${data.bodyShopName} has been added to production.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `Job has been added to production.`, recipients: [] }, fcm: { recipients: [] } @@ -214,8 +228,10 @@ const jobStatusChangeBuilder = (data) => { recipients: [] }, email: { - subject: `The status of ${data?.jobRoNumber} (${data.bodyShopName}) has changed from ${data.changedFields.status.old} to ${data.data.status}`, - body: `...`, // Placeholder indicating email body may need further customization + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `The status has changed from ${data.changedFields.status.old} to ${data.data.status}`, recipients: [] }, fcm: { recipients: [] } @@ -241,8 +257,10 @@ const newMediaAddedReassignedBuilder = (data) => { recipients: [] }, email: { - subject: `New media added to ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `New media has been added to job ${data?.jobRoNumber} in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `New media has been added.`, recipients: [] }, fcm: { recipients: [] } @@ -270,8 +288,10 @@ const newNoteAddedBuilder = (data) => { recipients: [] }, email: { - subject: `New note added to ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `A new note has been added to job ${data?.jobRoNumber} in ${data.bodyShopName}: "${data.data.text}"`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `A new note has been added: "${data.data.text}"`, recipients: [] }, fcm: { recipients: [] } @@ -297,8 +317,10 @@ const newTimeTicketPostedBuilder = (data) => { recipients: [] }, email: { - subject: `New time ticket posted for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `A new time ticket has been posted for job ${data?.jobRoNumber} in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `A new time ticket has been posted.`, recipients: [] }, fcm: { recipients: [] } @@ -327,8 +349,11 @@ const partMarkedBackOrderedBuilder = (data) => { recipients: [] }, email: { - subject: `Part marked back-ordered for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `A part for job ${data?.jobRoNumber} in ${data.bodyShopName} has been marked as back-ordered.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + // subject: `Part marked back-ordered for ${data?.jobRoNumber} (${data.bodyShopName})`, + body: `A part has been marked as back-ordered.`, recipients: [] }, fcm: { recipients: [] } @@ -356,8 +381,10 @@ const paymentCollectedCompletedBuilder = (data) => { recipients: [] }, email: { - subject: `Payment collected for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `Payment of $${data.data.clm_total} has been collected for job ${data?.jobRoNumber} in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `Payment of $${data.data.clm_total} has been collected.`, recipients: [] }, fcm: { recipients: [] } @@ -390,8 +417,11 @@ const scheduledDatesChangedBuilder = (data) => { recipients: [] }, email: { - subject: `Scheduled dates updated for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `Scheduled dates for job ${data?.jobRoNumber} in ${data.bodyShopName} have been updated.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + // subject: `Scheduled dates updated for ${data?.jobRoNumber} (${data.bodyShopName})`, + body: `Scheduled dates have been updated.`, recipients: [] }, fcm: { recipients: [] } @@ -419,8 +449,10 @@ const supplementImportedBuilder = (data) => { recipients: [] }, email: { - subject: `Supplement imported for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `A supplement of $${data.data.cieca_ttl?.data?.supp_amt || 0} has been imported for job ${data?.jobRoNumber} in ${data.bodyShopName}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `A supplement of $${data.data.cieca_ttl?.data?.supp_amt || 0} has been imported.`, recipients: [] }, fcm: { recipients: [] } @@ -449,8 +481,10 @@ const tasksUpdatedCreatedBuilder = (data) => { recipients: [] }, email: { - subject: `Tasks ${data.isNew ? "created" : "updated"} for ${data?.jobRoNumber} (${data.bodyShopName})`, - body: `Tasks for job ${data?.jobRoNumber} in ${data.bodyShopName} have been ${data.isNew ? "created" : "updated"}.`, + jobId: data.jobId, + jobRoNumber: data.jobRoNumber, + bodyShopName: data.bodyShopName, + body: `Tasks have been ${data.isNew ? "created" : "updated"}.`, recipients: [] }, fcm: { recipients: [] }