feature/IO-3096-GlobalNotifications - Checkpoint, Builders

This commit is contained in:
Dave Richer
2025-02-18 12:57:54 -05:00
parent c214ed1dfb
commit adb15a4748
7 changed files with 149 additions and 138 deletions

View File

@@ -31,8 +31,8 @@ const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents");
const { ElastiCacheClient, DescribeCacheClustersCommand } = require("@aws-sdk/client-elasticache");
const { InstanceRegion } = require("./server/utils/instanceMgr");
const StartStatusReporter = require("./server/utils/statusReporter");
const loadEmailQueue = require("./server/notifications/queues/emailQueue");
const loadAppQueue = require("./server/notifications/queues/appQueue");
const { loadEmailQueue } = require("./server/notifications/queues/emailQueue");
const { loadAppQueue } = require("./server/notifications/queues/appQueue");
const cleanupTasks = [];
let isShuttingDown = false;
@@ -297,8 +297,8 @@ const loadQueues = async ({ pubClient, logger, redisHelpers }) => {
// Assuming loadEmailQueue and loadAppQueue return Promises
const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([
loadEmailQueue()(queueSettings),
loadAppQueue()(queueSettings)
loadEmailQueue(queueSettings),
loadAppQueue(queueSettings)
]);
// Add error listeners or other setup for queues if needed

View File

@@ -92,7 +92,7 @@ const sendTaskEmail = async ({ to, subject, type = "text", html, text, attachmen
},
(err, info) => {
// (message, type, user, record, meta
logger.log("server-email", err ? "error" : "debug", null, null, { message: err || info });
logger.log("server-email", err ? "error" : "debug", null, null, { errorMessage: err?.message });
}
);
} catch (error) {

View File

@@ -1,22 +1,5 @@
/**
* Parses an event by comparing old and new data to determine which fields have changed.
*
* @async
* @function eventParser
* @param {Object} params - The parameters for parsing the event.
* @param {Object} params.oldData - The previous state of the data. If not provided, the data is considered new.
* @param {Object} params.newData - The new state of the data.
* @param {string} params.trigger - The trigger that caused the event.
* @param {string} params.table - The name of the table where the event occurred.
* @param {string} [params.jobIdField] - The field name or key path (e.g., "req.body.event.new.jobid") used to extract the job ID.
* @returns {Promise<Object>} An object containing:
* - {string[]} changedFieldNames - An array of field names that have changed.
* - {Object} changedFields - An object mapping changed field names to an object with `old` and `new` values.
* - {boolean} isNew - Indicates if the event is for new data (i.e., no oldData exists).
* - {Object} data - The new data.
* - {string} trigger - The event trigger.
* - {string} table - The table name.
* - {string|null} jobId - The extracted job ID, if available.
*/
const eventParser = async ({ oldData, newData, trigger, table, jobIdField }) => {
const isNew = !oldData;

View File

@@ -5,12 +5,19 @@ let appQueue;
const loadAppQueue = async ({ pubClient, logger, redisHelpers }) => {
if (!appQueue) {
logger.logger.info("Initializing Notifications App Queue");
appQueue = await new Queue("notificationsApp", { connection: pubClient, prefix: "{BULLMQ}" });
appQueue = new Queue("notificationsApp", {
connection: pubClient,
prefix: "{BULLMQ}"
});
}
return appQueue;
};
const getQueue = () => (!appQueue ? loadAppQueue : appQueue);
const getQueue = () => {
if (!appQueue) {
throw new Error("App queue not initialized. Ensure loadAppQueue is called during bootstrap.");
}
return appQueue;
};
module.exports = getQueue;
module.exports = { loadAppQueue, getQueue };

View File

@@ -1,40 +1,99 @@
const { Queue, Worker } = require("bullmq");
const { sendTaskEmail } = require("../../email/sendemail");
let emailQueue;
let worker;
const loadEmailQueue = async ({ pubClient, logger, redisHelpers }) => {
if (!emailQueue) {
logger.logger.info("Initializing Notifications Email Queue");
emailQueue = await new Queue("notificationsEmails", { connection: pubClient, prefix: "{BULLMQ}" });
emailQueue = new Queue("notificationsEmails", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000
}
}
});
// TODO: Test code for worker
// const worker = new Worker(
// "notificationsEmails",
// async (job) => {
// console.log("Processing job", job.id, "with data", job.data);
// // Simulate some work
// await new Promise((resolve) => setTimeout(resolve, 2000)); // Wait for 2 seconds
// console.log("Job processed");
// },
// { connection: pubClient, prefix: "{BULLMQ}" }
// );
//
// worker.on("completed", (job) => {
// console.log(`Job ${job.id} completed!`);
// // Optionally, close the worker after it's done
// worker.close().then(() => {
// console.log("Worker closed");
// });
// });
//
// worker.on("error", (err) => {
// console.error("Error in worker:", err);
// });
// Initialize the worker during queue setup
worker = new Worker(
"notificationsEmails",
async (job) => {
const { subject, body, recipients } = job.data;
logger.logger.debug(`Processing email job ${job.id} for ${recipients.length} recipients`);
await sendTaskEmail({
to: recipients.map((r) => r.user),
subject,
type: "text",
text: body
});
logger.logger.debug(`Email job ${job.id} processed successfully`);
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 2, // Reduced for multi-node setup; adjust based on load
limiter: {
max: 10, // Max 10 jobs per minute per worker
duration: 60 * 1000 // 1 minute
}
}
);
// Worker event handlers
worker.on("completed", (job) => {
logger.logger.debug(`Job ${job.id} completed`);
});
worker.on("failed", (job, err) => {
logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err });
});
worker.on("error", (err) => {
logger.logger.error("Worker error:", { error: err });
});
// Graceful shutdown handling
const shutdown = async () => {
if (worker) {
logger.logger.info("Closing email queue worker...");
await worker.close();
logger.logger.info("Email queue worker closed");
}
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
return emailQueue;
};
const getQueue = () => (!emailQueue ? loadEmailQueue : emailQueue);
const getQueue = () => {
if (!emailQueue) {
throw new Error("Email queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
}
return emailQueue;
};
module.exports = getQueue;
const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
const emailQueue = getQueue();
for (const email of emailsToDispatch) {
const { subject, body, recipients } = email;
await emailQueue.add("send-email", {
subject,
body,
recipients
}); // Job options moved to defaultJobOptions in Queue
logger.logger.debug(`Added email to queue: ${subject} for ${recipients.length} recipients`);
}
};
module.exports = { loadEmailQueue, getQueue, dispatchEmailsToQueue };

View File

@@ -1,6 +1,5 @@
const { getJobAssignmentType } = require("./stringHelpers");
// Helper function to populate watchers for app, fcm, and email channels
const populateWatchers = (data, result) => {
data.scenarioWatchers.forEach((recipients) => {
const { user, app, fcm, email } = recipients;
@@ -12,9 +11,9 @@ const populateWatchers = (data, result) => {
const alternateTransportChangedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.alternateTransportChanged",
variables: {
alternateTransport: data.data.alt_transport,
@@ -36,9 +35,9 @@ const alternateTransportChangedBuilder = (data) => {
const billPostedHandler = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.billPosted",
variables: {
clmTotal: data.data.clm_total
@@ -59,9 +58,9 @@ const billPostedHandler = (data) => {
const criticalPartsStatusChangedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.criticalPartsStatusChanged",
variables: {
queuedForParts: data.data.queued_for_parts,
@@ -84,9 +83,9 @@ const criticalPartsStatusChangedBuilder = (data) => {
const intakeDeliveryChecklistCompletedBuilder = (data) => {
const checklistType = data.changedFields.intakechecklist ? "intake" : "delivery";
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.checklistCompleted",
variables: {
checklistType,
@@ -108,14 +107,12 @@ const intakeDeliveryChecklistCompletedBuilder = (data) => {
const jobAssignedToMeBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.assigned",
variables: {
type: data.scenarioFields?.[0],
jobId: data.jobId,
bodyShopName: data.bodyShopName
type: data.scenarioFields?.[0]
},
recipients: []
},
@@ -133,14 +130,11 @@ const jobAssignedToMeBuilder = (data) => {
const jobsAddedToProductionBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.addedToProduction",
variables: {
inProduction: data.data.inproduction,
oldInProduction: data.changedFields.inproduction?.old
},
variables: {},
recipients: []
},
email: {
@@ -158,9 +152,9 @@ const jobsAddedToProductionBuilder = (data) => {
// Verified
const jobStatusChangeBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.statusChanged",
variables: {
status: data.data.status,
@@ -182,9 +176,9 @@ const jobStatusChangeBuilder = (data) => {
const newMediaAddedReassignedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.newMediaAdded",
variables: {},
recipients: []
@@ -204,9 +198,9 @@ const newMediaAddedReassignedBuilder = (data) => {
// Verified
const newNoteAddedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.newNoteAdded",
variables: {
text: data.data.text
@@ -227,9 +221,9 @@ const newNoteAddedBuilder = (data) => {
const newTimeTicketPostedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.newTimeTicketPosted",
variables: {},
recipients: []
@@ -248,9 +242,9 @@ const newTimeTicketPostedBuilder = (data) => {
const partMarkedBackOrderedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.partBackOrdered",
variables: {
queuedForParts: data.data.queued_for_parts,
@@ -272,9 +266,9 @@ const partMarkedBackOrderedBuilder = (data) => {
const paymentCollectedCompletedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.paymentCollected",
variables: {
clmTotal: data.data.clm_total
@@ -295,9 +289,9 @@ const paymentCollectedCompletedBuilder = (data) => {
const scheduledDatesChangedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.scheduledDatesChanged",
variables: {
scheduledIn: data.data.scheduled_in,
@@ -323,9 +317,9 @@ const scheduledDatesChangedBuilder = (data) => {
const supplementImportedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.supplementImported",
variables: {
suppAmt: data.data.cieca_ttl?.data?.supp_amt
@@ -346,11 +340,14 @@ const supplementImportedBuilder = (data) => {
const tasksUpdatedCreatedBuilder = (data) => {
const result = {
jobId: data.jobId,
bodyShopName: data.bodyShopName,
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
key: "notifications.job.tasksUpdated",
variables: {},
variables: {
type: data.isNew ? "created" : "updated",
roNumber: data.jobRoNumber
},
recipients: []
},
email: {

View File

@@ -9,50 +9,22 @@ const { client: gqlClient } = require("../graphql-client/graphql-client");
const queries = require("../graphql-client/queries");
const { isEmpty, isFunction } = require("lodash");
const { getMatchingScenarios } = require("./scenarioMapperr");
const emailQueue = require("./queues/emailQueue");
const consoleDir = require("../utils/consoleDir");
const { dispatchEmailsToQueue } = require("./queues/emailQueue");
/**
* Parses an event and determines matching scenarios for notifications.
* Queries job watchers and notification settings before triggering scenario builders.
*
* <p>This function performs the following steps:
* <ol>
* <li>Parse event data to extract necessary details using {@link eventParser}.</li>
* <li>Query job watchers for the given job ID using a GraphQL client.</li>
* <li>Retrieve body shop information from the job.</li>
* <li>Determine matching scenarios based on event data.</li>
* <li>Query notification settings for job watchers.</li>
* <li>Filter scenario watchers based on enabled notification methods.</li>
* <li>Trigger scenario builders for matching scenarios with eligible watchers.</li>
* </ol>
*
* @async
* @function scenarioParser
* @param {Object} req - The request object containing event data.
* Expected properties:
* <pre>
* {
* body: {
* event: { data: { new: Object, old: Object } },
* trigger: Object,
* table: string
* }
* }
* </pre>
* @param {string} jobIdField - The field used to identify the job ID.
* @returns {Promise<void>} A promise that resolves when the scenarios have been processed.
* @throws {Error} Throws an error if required request fields are missing or if body shop data is not found.
*/
const scenarioParser = async (req, jobIdField) => {
const { event, trigger, table } = req.body;
const { logger } = req;
if (!event?.data || !trigger || !table) {
throw new Error("Missing required request fields: event data, trigger, or table.");
}
// Step 1: Parse event data to extract necessary details.
// console.log(`1`);
const eventData = await eventParser({
newData: event.data.new,
oldData: event.data.old,
@@ -62,7 +34,6 @@ const scenarioParser = async (req, jobIdField) => {
});
// Step 2: Query job watchers for the given job ID.
// console.log(`2`);
const watcherData = await gqlClient.request(queries.GET_JOB_WATCHERS, {
jobid: eventData.jobId
});
@@ -79,7 +50,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 3: Retrieve body shop information from the job.
// console.log(`3`);
const bodyShopId = watcherData?.job?.bodyshop?.id;
const bodyShopName = watcherData?.job?.bodyshop?.shopname;
const jobRoNumber = watcherData?.job?.ro_number;
@@ -90,7 +60,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 4: Determine matching scenarios based on event data.
// console.log(`4`);
const matchingScenarios = getMatchingScenarios({
...eventData,
jobWatchers,
@@ -111,7 +80,6 @@ const scenarioParser = async (req, jobIdField) => {
};
// Step 5: Query notification settings for job watchers.
// console.log(`5`);
const associationsData = await gqlClient.request(queries.GET_NOTIFICATION_ASSOCIATIONS, {
emails: jobWatchers.map((x) => x.email),
shopid: bodyShopId
@@ -122,7 +90,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 6: Filter scenario watchers based on enabled notification methods.
// console.log(`6`);
finalScenarioData.matchingScenarios = finalScenarioData.matchingScenarios.map((scenario) => ({
...scenario,
scenarioWatchers: associationsData.associations
@@ -152,8 +119,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 7: Trigger scenario builders for matching scenarios with eligible watchers.
// console.log(`7`);
const scenariosToDispatch = [];
for (const scenario of finalScenarioData.matchingScenarios) {
@@ -177,8 +142,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 8: Filter scenario fields to only include changed fields.
// console.log(`8`);
const filteredScenarioFields =
scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || [];
@@ -208,16 +171,18 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 9: Dispatch Email Notifications to the Email Notification Queue
// console.log(`8`);
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email);
dispatchEmailsToQueue({
emailsToDispatch: scenariosToDispatch.map((scenario) => scenario?.email),
logger
}).catch((e) =>
logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, {
message: e?.message
})
);
// Step 10: Dispatch App Notifications to the App Notification Queue
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
consoleDir({ emailsToDispatch, appsToDispatch });
// TODO: Test Code for Queues
// emailQueue().add("test", { data: "test" });
consoleDir({ appsToDispatch });
};
module.exports = scenarioParser;