From adb15a47485f14fd1e0fc687415a7ccac6b843c3 Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Tue, 18 Feb 2025 12:57:54 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications - Checkpoint, Builders --- server.js | 8 +- server/email/sendemail.js | 2 +- server/notifications/eventParser.js | 17 ---- server/notifications/queues/appQueue.js | 15 ++- server/notifications/queues/emailQueue.js | 113 ++++++++++++++++------ server/notifications/scenarioBuilders.js | 75 +++++++------- server/notifications/scenarioParser.js | 57 +++-------- 7 files changed, 149 insertions(+), 138 deletions(-) diff --git a/server.js b/server.js index 714b0b506..81f6771c2 100644 --- a/server.js +++ b/server.js @@ -31,8 +31,8 @@ const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents"); const { ElastiCacheClient, DescribeCacheClustersCommand } = require("@aws-sdk/client-elasticache"); const { InstanceRegion } = require("./server/utils/instanceMgr"); const StartStatusReporter = require("./server/utils/statusReporter"); -const loadEmailQueue = require("./server/notifications/queues/emailQueue"); -const loadAppQueue = require("./server/notifications/queues/appQueue"); +const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); +const { loadAppQueue } = require("./server/notifications/queues/appQueue"); const cleanupTasks = []; let isShuttingDown = false; @@ -297,8 +297,8 @@ const loadQueues = async ({ pubClient, logger, redisHelpers }) => { // Assuming loadEmailQueue and loadAppQueue return Promises const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([ - loadEmailQueue()(queueSettings), - loadAppQueue()(queueSettings) + loadEmailQueue(queueSettings), + loadAppQueue(queueSettings) ]); // Add error listeners or other setup for queues if needed diff --git a/server/email/sendemail.js b/server/email/sendemail.js index 7bc258dce..97637fd2d 100644 --- a/server/email/sendemail.js +++ b/server/email/sendemail.js @@ -92,7 +92,7 @@ const sendTaskEmail = async ({ to, subject, type = "text", html, text, attachmen }, (err, info) => { // (message, type, user, record, meta - logger.log("server-email", err ? "error" : "debug", null, null, { message: err || info }); + logger.log("server-email", err ? "error" : "debug", null, null, { errorMessage: err?.message }); } ); } catch (error) { diff --git a/server/notifications/eventParser.js b/server/notifications/eventParser.js index 654006bbc..4cd0d30fd 100644 --- a/server/notifications/eventParser.js +++ b/server/notifications/eventParser.js @@ -1,22 +1,5 @@ /** * Parses an event by comparing old and new data to determine which fields have changed. - * - * @async - * @function eventParser - * @param {Object} params - The parameters for parsing the event. - * @param {Object} params.oldData - The previous state of the data. If not provided, the data is considered new. - * @param {Object} params.newData - The new state of the data. - * @param {string} params.trigger - The trigger that caused the event. - * @param {string} params.table - The name of the table where the event occurred. - * @param {string} [params.jobIdField] - The field name or key path (e.g., "req.body.event.new.jobid") used to extract the job ID. - * @returns {Promise} An object containing: - * - {string[]} changedFieldNames - An array of field names that have changed. - * - {Object} changedFields - An object mapping changed field names to an object with `old` and `new` values. - * - {boolean} isNew - Indicates if the event is for new data (i.e., no oldData exists). - * - {Object} data - The new data. - * - {string} trigger - The event trigger. - * - {string} table - The table name. - * - {string|null} jobId - The extracted job ID, if available. */ const eventParser = async ({ oldData, newData, trigger, table, jobIdField }) => { const isNew = !oldData; diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index c0752e37b..0f9ca3584 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -5,12 +5,19 @@ let appQueue; const loadAppQueue = async ({ pubClient, logger, redisHelpers }) => { if (!appQueue) { logger.logger.info("Initializing Notifications App Queue"); - - appQueue = await new Queue("notificationsApp", { connection: pubClient, prefix: "{BULLMQ}" }); + appQueue = new Queue("notificationsApp", { + connection: pubClient, + prefix: "{BULLMQ}" + }); } return appQueue; }; -const getQueue = () => (!appQueue ? loadAppQueue : appQueue); +const getQueue = () => { + if (!appQueue) { + throw new Error("App queue not initialized. Ensure loadAppQueue is called during bootstrap."); + } + return appQueue; +}; -module.exports = getQueue; +module.exports = { loadAppQueue, getQueue }; diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index 521a2112a..b93975891 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -1,40 +1,99 @@ const { Queue, Worker } = require("bullmq"); +const { sendTaskEmail } = require("../../email/sendemail"); let emailQueue; +let worker; const loadEmailQueue = async ({ pubClient, logger, redisHelpers }) => { if (!emailQueue) { logger.logger.info("Initializing Notifications Email Queue"); - emailQueue = await new Queue("notificationsEmails", { connection: pubClient, prefix: "{BULLMQ}" }); - } + emailQueue = new Queue("notificationsEmails", { + connection: pubClient, + prefix: "{BULLMQ}", + defaultJobOptions: { + attempts: 3, + backoff: { + type: "exponential", + delay: 1000 + } + } + }); - // TODO: Test code for worker - // const worker = new Worker( - // "notificationsEmails", - // async (job) => { - // console.log("Processing job", job.id, "with data", job.data); - // // Simulate some work - // await new Promise((resolve) => setTimeout(resolve, 2000)); // Wait for 2 seconds - // console.log("Job processed"); - // }, - // { connection: pubClient, prefix: "{BULLMQ}" } - // ); - // - // worker.on("completed", (job) => { - // console.log(`Job ${job.id} completed!`); - // // Optionally, close the worker after it's done - // worker.close().then(() => { - // console.log("Worker closed"); - // }); - // }); - // - // worker.on("error", (err) => { - // console.error("Error in worker:", err); - // }); + // Initialize the worker during queue setup + worker = new Worker( + "notificationsEmails", + async (job) => { + const { subject, body, recipients } = job.data; + logger.logger.debug(`Processing email job ${job.id} for ${recipients.length} recipients`); + + await sendTaskEmail({ + to: recipients.map((r) => r.user), + subject, + type: "text", + text: body + }); + + logger.logger.debug(`Email job ${job.id} processed successfully`); + }, + { + connection: pubClient, + prefix: "{BULLMQ}", + concurrency: 2, // Reduced for multi-node setup; adjust based on load + limiter: { + max: 10, // Max 10 jobs per minute per worker + duration: 60 * 1000 // 1 minute + } + } + ); + + // Worker event handlers + worker.on("completed", (job) => { + logger.logger.debug(`Job ${job.id} completed`); + }); + + worker.on("failed", (job, err) => { + logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err }); + }); + + worker.on("error", (err) => { + logger.logger.error("Worker error:", { error: err }); + }); + + // Graceful shutdown handling + const shutdown = async () => { + if (worker) { + logger.logger.info("Closing email queue worker..."); + await worker.close(); + logger.logger.info("Email queue worker closed"); + } + }; + + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); + } return emailQueue; }; -const getQueue = () => (!emailQueue ? loadEmailQueue : emailQueue); +const getQueue = () => { + if (!emailQueue) { + throw new Error("Email queue not initialized. Ensure loadEmailQueue is called during bootstrap."); + } + return emailQueue; +}; -module.exports = getQueue; +const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { + const emailQueue = getQueue(); + + for (const email of emailsToDispatch) { + const { subject, body, recipients } = email; + await emailQueue.add("send-email", { + subject, + body, + recipients + }); // Job options moved to defaultJobOptions in Queue + logger.logger.debug(`Added email to queue: ${subject} for ${recipients.length} recipients`); + } +}; + +module.exports = { loadEmailQueue, getQueue, dispatchEmailsToQueue }; diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js index bf4d18f6a..d4648f261 100644 --- a/server/notifications/scenarioBuilders.js +++ b/server/notifications/scenarioBuilders.js @@ -1,6 +1,5 @@ const { getJobAssignmentType } = require("./stringHelpers"); -// Helper function to populate watchers for app, fcm, and email channels const populateWatchers = (data, result) => { data.scenarioWatchers.forEach((recipients) => { const { user, app, fcm, email } = recipients; @@ -12,9 +11,9 @@ const populateWatchers = (data, result) => { const alternateTransportChangedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.alternateTransportChanged", variables: { alternateTransport: data.data.alt_transport, @@ -36,9 +35,9 @@ const alternateTransportChangedBuilder = (data) => { const billPostedHandler = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.billPosted", variables: { clmTotal: data.data.clm_total @@ -59,9 +58,9 @@ const billPostedHandler = (data) => { const criticalPartsStatusChangedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.criticalPartsStatusChanged", variables: { queuedForParts: data.data.queued_for_parts, @@ -84,9 +83,9 @@ const criticalPartsStatusChangedBuilder = (data) => { const intakeDeliveryChecklistCompletedBuilder = (data) => { const checklistType = data.changedFields.intakechecklist ? "intake" : "delivery"; const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.checklistCompleted", variables: { checklistType, @@ -108,14 +107,12 @@ const intakeDeliveryChecklistCompletedBuilder = (data) => { const jobAssignedToMeBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.assigned", variables: { - type: data.scenarioFields?.[0], - jobId: data.jobId, - bodyShopName: data.bodyShopName + type: data.scenarioFields?.[0] }, recipients: [] }, @@ -133,14 +130,11 @@ const jobAssignedToMeBuilder = (data) => { const jobsAddedToProductionBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.addedToProduction", - variables: { - inProduction: data.data.inproduction, - oldInProduction: data.changedFields.inproduction?.old - }, + variables: {}, recipients: [] }, email: { @@ -158,9 +152,9 @@ const jobsAddedToProductionBuilder = (data) => { // Verified const jobStatusChangeBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.statusChanged", variables: { status: data.data.status, @@ -182,9 +176,9 @@ const jobStatusChangeBuilder = (data) => { const newMediaAddedReassignedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.newMediaAdded", variables: {}, recipients: [] @@ -204,9 +198,9 @@ const newMediaAddedReassignedBuilder = (data) => { // Verified const newNoteAddedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.newNoteAdded", variables: { text: data.data.text @@ -227,9 +221,9 @@ const newNoteAddedBuilder = (data) => { const newTimeTicketPostedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.newTimeTicketPosted", variables: {}, recipients: [] @@ -248,9 +242,9 @@ const newTimeTicketPostedBuilder = (data) => { const partMarkedBackOrderedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.partBackOrdered", variables: { queuedForParts: data.data.queued_for_parts, @@ -272,9 +266,9 @@ const partMarkedBackOrderedBuilder = (data) => { const paymentCollectedCompletedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.paymentCollected", variables: { clmTotal: data.data.clm_total @@ -295,9 +289,9 @@ const paymentCollectedCompletedBuilder = (data) => { const scheduledDatesChangedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.scheduledDatesChanged", variables: { scheduledIn: data.data.scheduled_in, @@ -323,9 +317,9 @@ const scheduledDatesChangedBuilder = (data) => { const supplementImportedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.supplementImported", variables: { suppAmt: data.data.cieca_ttl?.data?.supp_amt @@ -346,11 +340,14 @@ const supplementImportedBuilder = (data) => { const tasksUpdatedCreatedBuilder = (data) => { const result = { - jobId: data.jobId, - bodyShopName: data.bodyShopName, app: { + jobId: data.jobId, + bodyShopId: data.bodyShopId, key: "notifications.job.tasksUpdated", - variables: {}, + variables: { + type: data.isNew ? "created" : "updated", + roNumber: data.jobRoNumber + }, recipients: [] }, email: { diff --git a/server/notifications/scenarioParser.js b/server/notifications/scenarioParser.js index 7ed7eb0ea..cb9f0fc2e 100644 --- a/server/notifications/scenarioParser.js +++ b/server/notifications/scenarioParser.js @@ -9,50 +9,22 @@ const { client: gqlClient } = require("../graphql-client/graphql-client"); const queries = require("../graphql-client/queries"); const { isEmpty, isFunction } = require("lodash"); const { getMatchingScenarios } = require("./scenarioMapperr"); -const emailQueue = require("./queues/emailQueue"); const consoleDir = require("../utils/consoleDir"); +const { dispatchEmailsToQueue } = require("./queues/emailQueue"); /** * Parses an event and determines matching scenarios for notifications. * Queries job watchers and notification settings before triggering scenario builders. - * - *

This function performs the following steps: - *

    - *
  1. Parse event data to extract necessary details using {@link eventParser}.
  2. - *
  3. Query job watchers for the given job ID using a GraphQL client.
  4. - *
  5. Retrieve body shop information from the job.
  6. - *
  7. Determine matching scenarios based on event data.
  8. - *
  9. Query notification settings for job watchers.
  10. - *
  11. Filter scenario watchers based on enabled notification methods.
  12. - *
  13. Trigger scenario builders for matching scenarios with eligible watchers.
  14. - *
- * - * @async - * @function scenarioParser - * @param {Object} req - The request object containing event data. - * Expected properties: - *
- *   {
- *     body: {
- *       event: { data: { new: Object, old: Object } },
- *       trigger: Object,
- *       table: string
- *     }
- *   }
- *   
- * @param {string} jobIdField - The field used to identify the job ID. - * @returns {Promise} A promise that resolves when the scenarios have been processed. - * @throws {Error} Throws an error if required request fields are missing or if body shop data is not found. */ const scenarioParser = async (req, jobIdField) => { const { event, trigger, table } = req.body; + const { logger } = req; if (!event?.data || !trigger || !table) { throw new Error("Missing required request fields: event data, trigger, or table."); } // Step 1: Parse event data to extract necessary details. - // console.log(`1`); const eventData = await eventParser({ newData: event.data.new, oldData: event.data.old, @@ -62,7 +34,6 @@ const scenarioParser = async (req, jobIdField) => { }); // Step 2: Query job watchers for the given job ID. - // console.log(`2`); const watcherData = await gqlClient.request(queries.GET_JOB_WATCHERS, { jobid: eventData.jobId }); @@ -79,7 +50,6 @@ const scenarioParser = async (req, jobIdField) => { } // Step 3: Retrieve body shop information from the job. - // console.log(`3`); const bodyShopId = watcherData?.job?.bodyshop?.id; const bodyShopName = watcherData?.job?.bodyshop?.shopname; const jobRoNumber = watcherData?.job?.ro_number; @@ -90,7 +60,6 @@ const scenarioParser = async (req, jobIdField) => { } // Step 4: Determine matching scenarios based on event data. - // console.log(`4`); const matchingScenarios = getMatchingScenarios({ ...eventData, jobWatchers, @@ -111,7 +80,6 @@ const scenarioParser = async (req, jobIdField) => { }; // Step 5: Query notification settings for job watchers. - // console.log(`5`); const associationsData = await gqlClient.request(queries.GET_NOTIFICATION_ASSOCIATIONS, { emails: jobWatchers.map((x) => x.email), shopid: bodyShopId @@ -122,7 +90,6 @@ const scenarioParser = async (req, jobIdField) => { } // Step 6: Filter scenario watchers based on enabled notification methods. - // console.log(`6`); finalScenarioData.matchingScenarios = finalScenarioData.matchingScenarios.map((scenario) => ({ ...scenario, scenarioWatchers: associationsData.associations @@ -152,8 +119,6 @@ const scenarioParser = async (req, jobIdField) => { } // Step 7: Trigger scenario builders for matching scenarios with eligible watchers. - // console.log(`7`); - const scenariosToDispatch = []; for (const scenario of finalScenarioData.matchingScenarios) { @@ -177,8 +142,6 @@ const scenarioParser = async (req, jobIdField) => { } // Step 8: Filter scenario fields to only include changed fields. - // console.log(`8`); - const filteredScenarioFields = scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || []; @@ -208,16 +171,18 @@ const scenarioParser = async (req, jobIdField) => { } // Step 9: Dispatch Email Notifications to the Email Notification Queue - // console.log(`8`); - - const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email); + dispatchEmailsToQueue({ + emailsToDispatch: scenariosToDispatch.map((scenario) => scenario?.email), + logger + }).catch((e) => + logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, { + message: e?.message + }) + ); // Step 10: Dispatch App Notifications to the App Notification Queue const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app); - - consoleDir({ emailsToDispatch, appsToDispatch }); - // TODO: Test Code for Queues - // emailQueue().add("test", { data: "test" }); + consoleDir({ appsToDispatch }); }; module.exports = scenarioParser;