feature/IO-3096-GlobalNotifications - Checkpoint, BULLMQ!

This commit is contained in:
Dave Richer
2025-02-13 16:19:36 -05:00
parent 5cfadf7929
commit df13f257db
13 changed files with 887 additions and 747 deletions

View File

@@ -2721,7 +2721,7 @@ query GET_JOB_WATCHERS($jobid: uuid!) {
}
}
job: jobs_by_pk(id: $jobid) {
id,
id
ro_number
clm_no
bodyshop {

View File

@@ -0,0 +1,16 @@
const { Queue } = require("bullmq");
let appQueue;
const loadAppQueue = async ({ pubClient, logger, redisHelpers }) => {
if (!appQueue) {
logger.logger.info("Initializing Notifications App Queue");
appQueue = await new Queue("notificationsApp", { connection: pubClient, prefix: "{BULLMQ}" });
}
return appQueue;
};
const getQueue = () => (!appQueue ? loadAppQueue : appQueue);
module.exports = getQueue;

View File

@@ -0,0 +1,40 @@
const { Queue, Worker } = require("bullmq");
let emailQueue;
const loadEmailQueue = async ({ pubClient, logger, redisHelpers }) => {
if (!emailQueue) {
logger.logger.info("Initializing Notifications Email Queue");
emailQueue = await new Queue("notificationsEmails", { connection: pubClient, prefix: "{BULLMQ}" });
}
// TODO: Test code for worker
// const worker = new Worker(
// "notificationsEmails",
// async (job) => {
// console.log("Processing job", job.id, "with data", job.data);
// // Simulate some work
// await new Promise((resolve) => setTimeout(resolve, 2000)); // Wait for 2 seconds
// console.log("Job processed");
// },
// { connection: pubClient, prefix: "{BULLMQ}" }
// );
//
// worker.on("completed", (job) => {
// console.log(`Job ${job.id} completed!`);
// // Optionally, close the worker after it's done
// worker.close().then(() => {
// console.log("Worker closed");
// });
// });
//
// worker.on("error", (err) => {
// console.error("Error in worker:", err);
// });
return emailQueue;
};
const getQueue = () => (!emailQueue ? loadEmailQueue : emailQueue);
module.exports = getQueue;

View File

@@ -1,4 +1,5 @@
const consoleDir = require("../utils/consoleDir");
const { getJobAssignmentType } = require("./stringHelpers");
const alternateTransportChangedBuilder = (data) => {
consoleDir(data);
@@ -17,7 +18,23 @@ const intakeDeliveryChecklistCompletedBuilder = (data) => {
};
const jobAssignedToMeBuilder = (data) => {
consoleDir(data);
return {
app: {
key: "notifications.job.assigned",
variables: {
type: data.scenarioFields?.[0],
jobId: data.jobId,
bodyShopName: data.bodyShopName
},
recipients: data.scenarioWatchers.map((watcher) => ({ email: watcher.user, employeeId: watcher.employeeId }))
},
email: {
subject: `You have been assigned to [${getJobAssignmentType(data.scenarioFields?.[0])}] on ${data?.jobRoNumber} in ${data.bodyShopName}`,
body: `Hello, a new job has been assigned to you in ${data.bodyShopName}.`,
recipient: data.scenarioWatchers.map((watcher) => watcher.user)
},
fcm: {}
};
};
const jobsAddedToProductionBuilder = (data) => {

View File

@@ -9,6 +9,7 @@ const { client: gqlClient } = require("../graphql-client/graphql-client");
const queries = require("../graphql-client/queries");
const { isEmpty, isFunction } = require("lodash");
const { getMatchingScenarios } = require("./scenarioMapperr");
const emailQueue = require("./queues/emailQueue");
/**
* Parses an event and determines matching scenarios for notifications.
@@ -80,6 +81,8 @@ const scenarioParser = async (req, jobIdField) => {
// console.log(`3`);
const bodyShopId = watcherData?.job?.bodyshop?.id;
const bodyShopName = watcherData?.job?.bodyshop?.shopname;
const jobRoNumber = watcherData?.job?.ro_number;
const jobClaimNumber = watcherData?.job?.clm_no;
if (!bodyShopId || !bodyShopName) {
throw new Error("No bodyshop data found for this job.");
@@ -149,6 +152,9 @@ const scenarioParser = async (req, jobIdField) => {
// Step 7: Trigger scenario builders for matching scenarios with eligible watchers.
// console.log(`7`);
const scenariosToDispatch = [];
for (const scenario of finalScenarioData.matchingScenarios) {
if (isEmpty(scenario.scenarioWatchers) || !isFunction(scenario.builder)) {
continue;
@@ -171,25 +177,45 @@ const scenarioParser = async (req, jobIdField) => {
// Step 8: Filter scenario fields to only include changed fields.
// console.log(`8`);
const filteredScenarioFields =
scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || [];
scenario.builder({
trigger: finalScenarioData.trigger.name,
bodyShopId: finalScenarioData.bodyShopId,
bodyShopName: finalScenarioData.bodyShopName,
scenarioKey: scenario.key,
scenarioTable: scenario.table,
scenarioFields: filteredScenarioFields,
scenarioBuilder: scenario.builder,
scenarioWatchers: eligibleWatchers,
jobId: finalScenarioData.jobId,
isNew: finalScenarioData.isNew,
changedFieldNames: finalScenarioData.changedFieldNames,
changedFields: finalScenarioData.changedFields,
data: finalScenarioData.data
});
scenariosToDispatch.push(
scenario.builder({
trigger: finalScenarioData.trigger.name,
bodyShopId: finalScenarioData.bodyShopId,
bodyShopName: finalScenarioData.bodyShopName,
scenarioKey: scenario.key,
scenarioTable: scenario.table,
scenarioFields: filteredScenarioFields,
scenarioBuilder: scenario.builder,
scenarioWatchers: eligibleWatchers,
jobId: finalScenarioData.jobId,
jobRoNumber: jobRoNumber,
jobClaimNumber: jobClaimNumber,
isNew: finalScenarioData.isNew,
changedFieldNames: finalScenarioData.changedFieldNames,
changedFields: finalScenarioData.changedFields,
data: finalScenarioData.data
})
);
}
if (isEmpty(scenariosToDispatch)) {
return;
}
// Step 9: Dispatch Email Notifications to the Email Notification Queue
// console.log(`8`);
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario.email);
// Step 10: Dispatch App Notifications to the App Notification Queue
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario.app);
// TODO: Test Code for Queues
// emailQueue().add("test", { data: "test" });
};
module.exports = scenarioParser;

View File

@@ -0,0 +1,18 @@
const getJobAssignmentType = (data) => {
switch (data) {
case "employee_pre":
return "Prep";
case "employee_body":
return "Body";
case "employee_csr":
return "CSR";
case "employee_refinish":
return "Refinish";
default:
return "";
}
};
module.exports = {
getJobAssignmentType
};