test-AIO - Merge in GlobalNotifications branch

This commit is contained in:
Dave Richer
2025-03-04 11:35:12 -05:00
98 changed files with 6552 additions and 1825 deletions

View File

@@ -7,7 +7,7 @@ const OAuthClient = require("intuit-oauth");
const client = require("../../graphql-client/graphql-client").client;
const queries = require("../../graphql-client/queries");
const { parse, stringify } = require("querystring");
const InstanceManager = require("../../utils/instanceMgr").default;
const { InstanceEndpoints } = require("../../utils/instanceMgr");
const oauthClient = new OAuthClient({
clientId: process.env.QBO_CLIENT_ID,
@@ -17,16 +17,8 @@ const oauthClient = new OAuthClient({
logging: true
});
let url;
if (process.env.NODE_ENV === "production") {
//TODO:AIO Add in QBO callbacks.
url = InstanceManager({ imex: `https://imex.online`, rome: `https://romeonline.io` });
} else if (process.env.NODE_ENV === "test") {
url = InstanceManager({ imex: `https://test.imex.online`, rome: `https://test.romeonline.io` });
} else {
url = `http://localhost:3000`;
}
//TODO:AIO Add in QBO callbacks.
const url = InstanceEndpoints();
exports.default = async (req, res) => {
const queryString = req.url.split("?").reverse()[0];

View File

@@ -69,11 +69,14 @@ const sendServerEmail = async ({ subject, text }) => {
}
},
(err, info) => {
logger.log("server-email-failure", err ? "error" : "debug", null, null, { message: err || info });
logger.log("server-email-failure", err ? "error" : "debug", null, null, {
message: err?.message,
stack: err?.stack
});
}
);
} catch (error) {
logger.log("server-email-failure", "error", null, null, { error });
logger.log("server-email-failure", "error", null, null, { message: error?.message, stack: error?.stack });
}
};
@@ -92,11 +95,11 @@ const sendTaskEmail = async ({ to, subject, type = "text", html, text, attachmen
},
(err, info) => {
// (message, type, user, record, meta
logger.log("server-email", err ? "error" : "debug", null, null, { message: err ? err?.message : info });
logger.log("server-email", err ? "error" : "debug", null, null, { message: err?.message, stack: err?.stack });
}
);
} catch (error) {
logger.log("server-email-failure", "error", null, null, { error });
logger.log("server-email-failure", "error", null, null, { message: error?.message, stack: error?.stack });
}
};
@@ -125,7 +128,8 @@ const sendEmail = async (req, res) => {
cc: req.body.cc,
subject: req.body.subject,
templateStrings: req.body.templateStrings,
error
errorMessage: error?.message,
errorStack: error?.stack
});
}
})
@@ -194,7 +198,8 @@ const sendEmail = async (req, res) => {
cc: req.body.cc,
subject: req.body.subject,
templateStrings: req.body.templateStrings,
error: err
errorMessage: err?.message,
errorStack: err?.stack
});
logEmail(req, {
to: req.body.to,
@@ -202,7 +207,7 @@ const sendEmail = async (req, res) => {
subject: req.body.subject,
bodyshopid: req.body.bodyshopid
});
res.status(500).json({ success: false, error: err });
res.status(500).json({ success: false, errorMessage: err?.message, stack: err?.stack });
}
}
);
@@ -270,14 +275,16 @@ ${body.bounce?.bouncedRecipients.map(
},
(err, info) => {
logger.log("sns-error", err ? "error" : "debug", "api", null, {
message: err ? err?.message : info
errorMessage: err?.message,
errorStack: err?.stack
});
}
);
}
} catch (error) {
logger.log("sns-error", "ERROR", "api", null, {
error: JSON.stringify(error)
errorMessage: error?.message,
errorStack: error?.stack
});
}
res.sendStatus(200);

View File

@@ -10,6 +10,7 @@ const generateEmailTemplate = require("./generateTemplate");
const moment = require("moment-timezone");
const { taskEmailQueue } = require("./tasksEmailsQueue");
const mailer = require("./mailer");
const { InstanceEndpoints } = require("../utils/instanceMgr");
// Initialize the Tasks Email Queue
const tasksEmailQueue = taskEmailQueue();
@@ -83,15 +84,8 @@ const formatPriority = (priority) => {
* @param taskId
* @returns {{header, body: string, subHeader: string}}
*/
const getEndpoints = (bodyshop) =>
InstanceManager({
imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online",
rome: process.env?.NODE_ENV === "test" ? "https//test.romeonline.io" : "https://romeonline.io"
});
const generateTemplateArgs = (title, priority, description, dueDate, bodyshop, job, taskId, dateLine, createdBy) => {
const endPoints = getEndpoints(bodyshop);
const endPoints = InstanceEndpoints();
return {
header: title,
subHeader: `Body Shop: ${bodyshop.shopname} | Priority: ${formatPriority(priority)} ${formatDate(dueDate)} | Created By: ${createdBy || "N/A"}`,
@@ -108,9 +102,8 @@ const generateTemplateArgs = (title, priority, description, dueDate, bodyshop, j
* @param html
* @param taskIds
* @param successCallback
* @param requestInstance
*/
const sendMail = (type, to, subject, html, taskIds, successCallback, requestInstance) => {
const sendMail = (type, to, subject, html, taskIds, successCallback) => {
const fromEmails = InstanceManager({
imex: "ImEX Online <noreply@imex.online>",
rome: "Rome Online <noreply@romeonline.io>"
@@ -136,7 +129,7 @@ const sendMail = (type, to, subject, html, taskIds, successCallback, requestInst
};
/**
* Send an email to the assigned user.
* Email the assigned user.
* @param req
* @param res
* @returns {Promise<*>}
@@ -186,7 +179,7 @@ const taskAssignedEmail = async (req, res) => {
};
/**
* Send an email to remind the user of their tasks.
* Email remind the user of their tasks.
* @param req
* @param res
* @returns {Promise<*>}
@@ -264,11 +257,6 @@ const tasksRemindEmail = async (req, res) => {
}
// There are multiple emails to send to this author.
else {
const endPoints = InstanceManager({
imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online",
rome: process.env?.NODE_ENV === "test" ? "https//test.romeonline.io" : "https://romeonline.io"
});
const allTasks = groupedTasks[recipient.email];
emailData.subject = `New Tasks Reminder - ${allTasks.length} Tasks require your attention`;
emailData.html = generateEmailTemplate({
@@ -278,7 +266,7 @@ const tasksRemindEmail = async (req, res) => {
body: `<ul>
${allTasks
.map((task) =>
`<li><a href="${endPoints}/manage/tasks/alltasks?taskid=${task.id}">${task.title} - Priority: ${formatPriority(task.priority)} ${task.due_date ? `${formatDate(task.due_date)}` : ""} | Bodyshop: ${task.bodyshop.shopname}</a></li>`.trim()
`<li><a href="${InstanceEndpoints()}/manage/tasks/alltasks?taskid=${task.id}">${task.title} - Priority: ${formatPriority(task.priority)} ${task.due_date ? `${formatDate(task.due_date)}` : ""} | Bodyshop: ${task.bodyshop.shopname}</a></li>`.trim()
)
.join("")}
</ul>`
@@ -338,6 +326,5 @@ const tasksRemindEmail = async (req, res) => {
module.exports = {
taskAssignedEmail,
tasksRemindEmail,
getEndpoints
tasksRemindEmail
};

View File

@@ -2765,3 +2765,55 @@ exports.GET_DOCUMENTS_BY_IDS = `
}
`;
exports.GET_JOB_WATCHERS = `
query GET_JOB_WATCHERS($jobid: uuid!) {
job_watchers(where: { jobid: { _eq: $jobid } }) {
user_email
user {
authid
employee {
id
first_name
last_name
}
}
}
job: jobs_by_pk(id: $jobid) {
id
ro_number
clm_no
bodyshop {
id
shopname
}
}
}
`;
exports.GET_NOTIFICATION_ASSOCIATIONS = `
query GET_NOTIFICATION_ASSOCIATIONS($emails: [String!]!, $shopid: uuid!) {
associations(where: {
useremail: { _in: $emails },
shopid: { _eq: $shopid }
}) {
id
useremail
notification_settings
}
}
`;
exports.INSERT_NOTIFICATIONS_MUTATION = ` mutation INSERT_NOTIFICATIONS($objects: [notifications_insert_input!]!) {
insert_notifications(objects: $objects) {
affected_rows
returning {
id
jobid
associationid
scenario_text
fcm_text
scenario_meta
}
}
}`;

View File

@@ -10,12 +10,11 @@ const moment = require("moment");
const logger = require("../utils/logger");
const { sendTaskEmail } = require("../email/sendemail");
const generateEmailTemplate = require("../email/generateTemplate");
const { getEndpoints } = require("../email/tasksEmails");
const domain = process.env.NODE_ENV ? "secure" : "test";
const { SecretsManagerClient, GetSecretValueCommand } = require("@aws-sdk/client-secrets-manager");
const { InstanceRegion } = require("../utils/instanceMgr");
const { InstanceRegion, InstanceEndpoints } = require("../utils/instanceMgr");
const client = new SecretsManagerClient({
region: InstanceRegion()
@@ -443,31 +442,28 @@ exports.postback = async (req, res) => {
});
if (values.origin === "OneLink" && parsedComment.userEmail) {
try {
const endPoints = getEndpoints();
sendTaskEmail({
to: parsedComment.userEmail,
subject: `New Payment(s) Received - RO ${jobs.jobs.map((j) => j.ro_number).join(", ")}`,
type: "html",
html: generateEmailTemplate({
header: "New Payment(s) Received",
subHeader: "",
body: jobs.jobs
.map(
(job) =>
`Reference: <a href="${endPoints}/manage/jobs/${job.id}">${job.ro_number || "N/A"}</a> | ${job.ownr_co_nm ? job.ownr_co_nm : `${job.ownr_fn || ""} ${job.ownr_ln || ""}`.trim()} | ${`${job.v_model_yr || ""} ${job.v_make_desc || ""} ${job.v_model_desc || ""}`.trim()} | $${partialPayments.find((p) => p.jobid === job.id).amount}`
)
.join("<br/>")
})
});
} catch (error) {
sendTaskEmail({
to: parsedComment.userEmail,
subject: `New Payment(s) Received - RO ${jobs.jobs.map((j) => j.ro_number).join(", ")}`,
type: "html",
html: generateEmailTemplate({
header: "New Payment(s) Received",
subHeader: "",
body: jobs.jobs
.map(
(job) =>
`Reference: <a href="${InstanceEndpoints()}/manage/jobs/${job.id}">${job.ro_number || "N/A"}</a> | ${job.ownr_co_nm ? job.ownr_co_nm : `${job.ownr_fn || ""} ${job.ownr_ln || ""}`.trim()} | ${`${job.v_model_yr || ""} ${job.v_make_desc || ""} ${job.v_model_desc || ""}`.trim()} | $${partialPayments.find((p) => p.jobid === job.id).amount}`
)
.join("<br/>")
})
}).catch((error) => {
logger.log("intellipay-postback-email-error", "ERROR", req.user?.email, null, {
message: error.message,
jobs,
paymentResult,
...logResponseMeta
});
}
});
}
res.sendStatus(200);
} else if (values.invoice) {

View File

@@ -0,0 +1,140 @@
/**
* @fileoverview Notification event handlers.
* This module exports functions to handle various notification events.
* Each handler optionally calls the scenarioParser and logs errors if they occur,
* then returns a JSON response with a success message.
*/
const scenarioParser = require("./scenarioParser");
/**
* Processes a notification event by invoking the scenario parser.
* The scenarioParser is intentionally not awaited so that the response is sent immediately.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @param {string} parserPath - The key path to be passed to scenarioParser.
* @param {string} successMessage - The message to return on success.
* @returns {Promise<Object>} A promise that resolves to an Express JSON response.
*/
async function processNotificationEvent(req, res, parserPath, successMessage) {
const { logger } = req;
// Call scenarioParser but don't await it; log any error that occurs.
scenarioParser(req, parserPath).catch((error) => {
logger.log("notifications-error", "error", "notifications", null, { message: error?.message, stack: error?.stack });
});
return res.status(200).json({ message: successMessage });
}
/**
* Handle job change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleJobsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.id", "Job Notifications Event Handled.");
/**
* Handle bills change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleBillsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Bills Changed Notification Event Handled.");
/**
* Handle documents change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleDocumentsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Documents Change Notifications Event Handled.");
/**
* Handle job lines change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleJobLinesChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "JobLines Change Notifications Event Handled.");
/**
* Handle notes change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleNotesChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Notes Changed Notification Event Handled.");
/**
* Handle parts dispatch change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsDispatchChange = (req, res) => res.status(200).json({ message: "Parts Dispatch change handled." });
/**
* Handle parts order change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsOrderChange = (req, res) => res.status(200).json({ message: "Parts Order change handled." });
/**
* Handle payments change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handlePaymentsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Payments Changed Notification Event Handled.");
/**
* Handle tasks change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleTasksChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Tasks Notifications Event Handled.");
/**
* Handle time tickets change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleTimeTicketsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Time Tickets Changed Notification Event Handled.");
module.exports = {
handleJobsChange,
handleBillsChange,
handleDocumentsChange,
handleJobLinesChange,
handleNotesChange,
handlePartsDispatchChange,
handlePartsOrderChange,
handlePaymentsChange,
handleTasksChange,
handleTimeTicketsChange
};

View File

@@ -1,5 +0,0 @@
const handleJobsChange = (req, res) => {
return res.status(200).json({ message: "Jobs change handled." });
};
module.exports = handleJobsChange;

View File

@@ -1,5 +0,0 @@
const handleBillsChange = (req, res) => {
return res.status(200).json({ message: "Bills change handled." });
};
module.exports = handleBillsChange;

View File

@@ -1,5 +0,0 @@
const handlePartsDispatchChange = (req, res) => {
return res.status(200).json({ message: "Parts Dispatch change handled." });
};
module.exports = handlePartsDispatchChange;

View File

@@ -1,5 +0,0 @@
const handlePartsOrderChange = (req, res) => {
return res.status(200).json({ message: "Parts Order change handled." });
};
module.exports = handlePartsOrderChange;

View File

@@ -1,5 +0,0 @@
const handleTasksChange = (req, res) => {
return res.status(200).json({ message: "Tasks change handled." });
};
module.exports = handleTasksChange;

View File

@@ -1,5 +0,0 @@
const handleTimeTicketsChange = (req, res) => {
return res.status(200).json({ message: "Time Tickets change handled." });
};
module.exports = handleTimeTicketsChange;

View File

@@ -0,0 +1,74 @@
/**
* Parses an event by comparing old and new data to determine which fields have changed.
*
* This function analyzes the differences between previous (`oldData`) and current (`newData`)
* data states to identify changed fields. It determines if the event is a new entry or an update
* and returns details about changed fields, the event type, and associated metadata.
*
* @param {Object} options - Configuration options for parsing the event.
* @param {Object} [options.oldData] - The previous state of the data (undefined for new entries).
* @param {Object} options.newData - The current state of the data.
* @param {string} options.trigger - The type of event trigger (e.g., 'INSERT', 'UPDATE').
* @param {string} options.table - The name of the table associated with the event.
* @param {string} [options.jobId] - The job ID, if already extracted by the caller (optional).
* @returns {Object} An object containing the parsed event details:
* - {Array<string>} changedFieldNames - List of field names that have changed.
* - {Object} changedFields - Map of changed fields with their old and new values.
* - {boolean} isNew - True if the event is a new entry (no oldData provided).
* - {Object} data - The current data state (`newData`).
* - {string} trigger - The event trigger type.
* - {string} table - The table name.
* - {string|null} jobId - The provided jobId or null if not provided.
*/
const eventParser = async ({ oldData, newData, trigger, table, jobId = null }) => {
const isNew = !oldData; // True if no old data exists, indicating a new entry
let changedFields = {};
let changedFieldNames = [];
if (isNew) {
// For new entries, all fields in newData are considered "changed" (from undefined to their value)
changedFields = Object.fromEntries(
Object.entries(newData).map(([key, value]) => [key, { old: undefined, new: value }])
);
changedFieldNames = Object.keys(newData); // All keys are new
} else {
// Compare oldData and newData to detect updates
for (const key in newData) {
if (Object.prototype.hasOwnProperty.call(newData, key)) {
// Check if the field is new or its value has changed
if (
!Object.prototype.hasOwnProperty.call(oldData, key) || // Field didnt exist before
JSON.stringify(oldData[key]) !== JSON.stringify(newData[key]) // Values differ (deep comparison)
) {
changedFields[key] = {
old: oldData[key], // Undefined if field wasnt in oldData
new: newData[key]
};
changedFieldNames.push(key);
}
}
}
// Identify fields removed in newData (present in oldData but absent in newData)
for (const key in oldData) {
if (Object.prototype.hasOwnProperty.call(oldData, key) && !Object.prototype.hasOwnProperty.call(newData, key)) {
changedFields[key] = {
old: oldData[key],
new: null // Mark as removed
};
changedFieldNames.push(key);
}
}
}
return {
changedFieldNames, // Array of fields that changed
changedFields, // Object with old/new values for changed fields
isNew, // Boolean indicating if this is a new entry
data: newData, // Current data state
trigger, // Event trigger (e.g., 'INSERT', 'UPDATE')
table, // Associated table name
jobId // Provided jobId or null
};
};
module.exports = eventParser;

View File

@@ -0,0 +1,290 @@
const { Queue, Worker } = require("bullmq");
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const graphQLClient = require("../../graphql-client/graphql-client").client;
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
const APP_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 1 : Math.max(1, parsedValue); // Default to 1, ensure at least 1
})();
// Base time-related constant (in milliseconds) / DO NOT TOUCH
const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH
const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base)
const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base)
let addQueue;
let consolidateQueue;
/**
* Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications.
*
* @param {Array<Object>} notifications - Array of notification objects with 'body' and 'variables'.
* @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'.
*/
const buildNotificationContent = (notifications) => {
const scenarioText = notifications.map((n) => n.body); // Array of text entries
const fcmText = scenarioText.join(". "); // Concatenated text with period separator
const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects
return {
scenario_text: scenarioText,
fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null
scenario_meta: scenarioMeta
};
};
/**
* Initializes the notification queues and workers for adding and consolidating notifications.
*/
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
logger.logger.debug("Initializing Notifications Queues");
addQueue = new Queue("notificationsAdd", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
consolidateQueue = new Queue("notificationsConsolidate", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
const addWorker = new Worker(
"notificationsAdd",
async (job) => {
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
logger.logger.debug(`Adding notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
for (const recipient of recipients) {
const { user } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
const existingNotifications = await pubClient.get(userKey);
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
notifications.push(notification);
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000);
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
const consolidateKey = `app:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
if (flagSet) {
await consolidateQueue.add(
"consolidate-notifications",
{ jobId, recipients },
{
jobId: `consolidate:${jobId}`,
delay: APP_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
logger.logger.debug(`Scheduled consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
} else {
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
const consolidateWorker = new Worker(
"notificationsConsolidate",
async (job) => {
const { jobId, recipients } = job.data;
logger.logger.debug(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const lockKey = `lock:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
if (lockAcquired) {
try {
const allNotifications = {};
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
for (const user of uniqueUsers) {
const userKey = `${redisKeyPrefix}:${user}`;
const notifications = await pubClient.get(userKey);
logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
const userRecipients = recipients.filter((r) => r.user === user);
for (const { bodyShopId } of userRecipients) {
allNotifications[user] = allNotifications[user] || {};
allNotifications[user][bodyShopId] = parsedNotifications;
}
await pubClient.del(userKey);
logger.logger.debug(`Deleted Redis key ${userKey}`);
} else {
logger.logger.warn(`No notifications found for ${user} under ${userKey}`);
}
}
logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
// Insert notifications into the database and collect IDs
const notificationInserts = [];
const notificationIdMap = new Map();
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userRecipients = recipients.filter((r) => r.user === user);
const associationId = userRecipients[0]?.associationId;
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications);
notificationInserts.push({
jobid: jobId,
associationid: associationId,
scenario_text: JSON.stringify(scenario_text),
fcm_text: fcm_text,
scenario_meta: JSON.stringify(scenario_meta)
});
notificationIdMap.set(`${user}:${bodyShopId}`, null);
}
}
if (notificationInserts.length > 0) {
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
objects: notificationInserts
});
logger.logger.debug(
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
);
insertResponse.insert_notifications.returning.forEach((row, index) => {
const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)];
const bodyShopId = Object.keys(allNotifications[user])[
index % Object.keys(allNotifications[user]).length
];
notificationIdMap.set(`${user}:${bodyShopId}`, row.id);
});
}
// Emit notifications to users via Socket.io with notification ID
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userMapping = await redisHelpers.getUserSocketMapping(user);
const userRecipients = recipients.filter((r) => r.user === user);
const associationId = userRecipients[0]?.associationId;
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`);
const jobRoNumber = notifications[0]?.jobRoNumber;
if (userMapping && userMapping[bodyShopId]?.socketIds) {
userMapping[bodyShopId].socketIds.forEach((socketId) => {
ioRedis.to(socketId).emit("notification", {
jobId,
jobRoNumber,
bodyShopId,
notifications,
notificationId,
associationId
});
});
logger.logger.debug(
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
);
} else {
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
}
}
await pubClient.del(`app:consolidate:${jobId}`);
} catch (err) {
logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
});
throw err;
} finally {
await pubClient.del(lockKey);
}
} else {
logger.logger.debug(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
addWorker.on("completed", (job) => logger.logger.debug(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => logger.logger.debug(`Consolidate job ${job.id} completed`));
addWorker.on("failed", (job, err) =>
logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
consolidateWorker.on("failed", (job, err) =>
logger.log(`app-queue-consolidation-failed:`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
logger.logger.debug("Closing app queue workers...");
await Promise.all([addWorker.close(), consolidateWorker.close()]);
logger.logger.debug("App queue workers closed");
};
registerCleanupTask(shutdown);
}
return addQueue;
};
/**
* Retrieves the initialized `addQueue` instance.
*/
const getQueue = () => {
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
return addQueue;
};
/**
* Dispatches notifications to the `addQueue` for processing.
*/
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
const appQueue = getQueue();
for (const app of appsToDispatch) {
const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app;
await appQueue.add(
"add-notification",
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.debug(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };

View File

@@ -0,0 +1,241 @@
const { Queue, Worker } = require("bullmq");
const { sendTaskEmail } = require("../../email/sendemail");
const generateEmailTemplate = require("../../email/generateTemplate");
const { InstanceEndpoints } = require("../../utils/instanceMgr");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 1 : Math.max(1, parsedValue); // Default to 1, ensure at least 1
})();
// Base time-related constant (in milliseconds) / DO NOT TOUCH
const EMAIL_CONSOLIDATION_DELAY = EMAIL_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
// Derived time-related constants based on EMAIL_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to EMAIL_CONSOLIDATION_DELAY
const CONSOLIDATION_KEY_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation)
const LOCK_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration)
const RATE_LIMITER_DURATION = EMAIL_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting)
const NOTIFICATION_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (matches consolidation key expiration)
let emailAddQueue;
let emailConsolidateQueue;
let emailAddWorker;
let emailConsolidateWorker;
/**
* Initializes the email notification queues and workers.
*
* @param {Object} options - Configuration options for queue initialization.
* @param {Object} options.pubClient - Redis client instance for queue communication.
* @param {Object} options.logger - Logger instance for logging events and debugging.
* @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications.
*/
const loadEmailQueue = async ({ pubClient, logger }) => {
if (!emailAddQueue || !emailConsolidateQueue) {
logger.logger.debug("Initializing Email Notification Queues");
// Queue for adding email notifications
emailAddQueue = new Queue("emailAdd", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
// Queue for consolidating and sending emails
emailConsolidateQueue = new Queue("emailConsolidate", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
// Worker to process adding notifications
emailAddWorker = new Worker(
"emailAdd",
async (job) => {
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
logger.logger.debug(`Adding email notifications for jobId ${jobId}`);
const redisKeyPrefix = `email:notifications:${jobId}`;
for (const recipient of recipients) {
const { user, firstName, lastName } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
await pubClient.rpush(userKey, body);
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
const detailsKey = `email:recipientDetails:${jobId}:${user}`;
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.sadd(`email:recipients:${jobId}`, user);
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
}
const consolidateKey = `email:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
if (flagSet) {
await emailConsolidateQueue.add(
"consolidate-emails",
{ jobId, jobRoNumber, bodyShopName },
{
jobId: `consolidate:${jobId}`,
delay: EMAIL_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
logger.logger.debug(`Scheduled email consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
} else {
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
// Worker to consolidate and send emails
emailConsolidateWorker = new Worker(
"emailConsolidate",
async (job) => {
const { jobId, jobRoNumber, bodyShopName } = job.data;
logger.logger.debug(`Consolidating emails for jobId ${jobId}`);
const lockKey = `lock:emailConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
if (lockAcquired) {
try {
const recipientsSet = `email:recipients:${jobId}`;
const recipients = await pubClient.smembers(recipientsSet);
for (const recipient of recipients) {
const userKey = `email:notifications:${jobId}:${recipient}`;
const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
const messages = await pubClient.lrange(userKey, 0, -1);
if (messages.length > 0) {
const details = await pubClient.hgetall(detailsKey);
const firstName = details.firstName || "User";
const multipleUpdateString = messages.length > 1 ? "Updates" : "Update";
const subject = `${multipleUpdateString} for job ${jobRoNumber} at ${bodyShopName}`;
const emailBody = generateEmailTemplate({
header: `${multipleUpdateString} for Job ${jobRoNumber}`,
subHeader: `Dear ${firstName},`,
body: `
<p>There have been updates to job ${jobRoNumber} at ${bodyShopName}:</p><br/>
<ul>
${messages.map((msg) => `<li>${msg}</li>`).join("")}
</ul><br/><br/>
<p><a href="${InstanceEndpoints()}/manage/jobs/${jobId}">Please check the job for more details.</a></p>
`
});
await sendTaskEmail({
to: recipient,
subject,
type: "html",
html: emailBody
});
logger.logger.debug(
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
);
await pubClient.del(userKey);
await pubClient.del(detailsKey);
}
}
await pubClient.del(recipientsSet);
await pubClient.del(`email:consolidate:${jobId}`);
} catch (err) {
logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
});
throw err;
} finally {
await pubClient.del(lockKey);
}
} else {
logger.logger.debug(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
// Event handlers for workers
emailAddWorker.on("completed", (job) => logger.logger.debug(`Email add job ${job.id} completed`));
emailConsolidateWorker.on("completed", (job) => logger.logger.debug(`Email consolidate job ${job.id} completed`));
emailAddWorker.on("failed", (job, err) =>
logger.log(`add-email-queue-failed`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
emailConsolidateWorker.on("failed", (job, err) =>
logger.log(`email-consolidation-job-failed`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
logger.logger.debug("Closing email queue workers...");
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
logger.logger.debug("Email queue workers closed");
};
registerCleanupTask(shutdown);
}
return emailAddQueue;
};
/**
* Retrieves the initialized `emailAddQueue` instance.
*
* @returns {Queue} The `emailAddQueue` instance for adding notifications.
* @throws {Error} If `emailAddQueue` is not initialized.
*/
const getQueue = () => {
if (!emailAddQueue) {
throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
}
return emailAddQueue;
};
/**
* Dispatches email notifications to the `emailAddQueue` for processing.
*
* @param {Object} options - Options for dispatching notifications.
* @param {Array} options.emailsToDispatch - Array of email notification objects.
* @param {Object} options.logger - Logger instance for logging dispatch events.
* @returns {Promise<void>} Resolves when all notifications are added to the queue.
*/
const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
const emailAddQueue = getQueue();
for (const email of emailsToDispatch) {
const { jobId, jobRoNumber, bodyShopName, body, recipients } = email;
if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) {
logger.logger.warn(
`Skipping email dispatch for jobId ${jobId} due to missing data: ` +
`jobRoNumber=${jobRoNumber}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}`
);
continue;
}
await emailAddQueue.add(
"add-email-notification",
{ jobId, jobRoNumber, bodyShopName, body, recipients },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.debug(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadEmailQueue, getQueue, dispatchEmailsToQueue };

View File

@@ -0,0 +1,537 @@
const { getJobAssignmentType } = require("./stringHelpers");
/**
* Populates the recipients for app, email, and FCM notifications based on scenario watchers.
*
* @param {Object} data - The data object containing scenarioWatchers and bodyShopId.
* @param {Object} result - The result object to populate with recipients for app, email, and FCM notifications.
*/
const populateWatchers = (data, result) => {
data.scenarioWatchers.forEach((recipients) => {
const { user, app, fcm, email, firstName, lastName, employeeId, associationId } = recipients;
if (app === true)
result.app.recipients.push({
user,
bodyShopId: data.bodyShopId,
employeeId,
associationId
});
if (fcm === true) result.fcm.recipients.push(user);
if (email === true) result.email.recipients.push({ user, firstName, lastName });
});
};
/**
* Builds notification data for changes to alternate transport.
*/
// Verified
const alternateTransportChangedBuilder = (data) => {
const body = `The Alternate Transport status has been updated to ${data?.data?.alt_transport}.`;
const result = {
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
jobRoNumber: data.jobRoNumber,
key: "notifications.job.alternateTransportChanged",
body, // Same as email body
variables: {
alternateTransport: data.changedFields.alt_transport?.new,
oldAlternateTransport: data.changedFields.alt_transport?.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for bill posted events.
*/
//verified
const billPostedHandler = (data) => {
const facing = data?.data?.isinhouse ? "In-House" : "External";
const body = `An ${facing} Bill has been posted${data?.data?.is_credit_memo ? " (Credit Memo)" : ""}.`.trim();
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.billPosted",
body,
variables: {
facing,
is_credit_memo: data?.data?.is_credit_memo
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for changes to critical parts status.
*/
// TODO: Needs change
const criticalPartsStatusChangedBuilder = (data) => {
const body = `The critical parts status has changed to ${data.data.queued_for_parts ? "queued" : "not queued"}.`;
const result = {
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
jobRoNumber: data.jobRoNumber,
key: "notifications.job.criticalPartsStatusChanged",
body,
variables: {
queuedForParts: data.data.queued_for_parts,
oldQueuedForParts: data.changedFields.queued_for_parts?.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for completed intake or delivery checklists.
*/
// Verified
const intakeDeliveryChecklistCompletedBuilder = (data) => {
const checklistType = data?.changedFields?.intakechecklist ? "Intake" : "Delivery";
const body = `The ${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist has been completed.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.checklistCompleted",
body,
variables: {
checklistType,
completed: true
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for job assignment events.
*/
// Verified
const jobAssignedToMeBuilder = (data) => {
const body = `You have been assigned to ${getJobAssignmentType(data.scenarioFields?.[0])}`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.assigned",
body,
variables: {
type: data.scenarioFields?.[0]
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for jobs added to production.
*/
// Verified
const jobsAddedToProductionBuilder = (data) => {
const body = `Has been added to Production.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.addedToProduction",
body,
variables: {},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for job status changes.
*/
// Verified
const jobStatusChangeBuilder = (data) => {
const body = `The status has changed from ${data.changedFields.status.old} to ${data.changedFields.status.new}`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.statusChanged",
body,
variables: {
status: data.changedFields.status.new,
oldStatus: data.changedFields.status.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for new media added or reassigned events.
*/
// Verified
const newMediaAddedReassignedBuilder = (data) => {
// Determine if it's an image or document
const mediaType = data?.data?.type?.startsWith("image") ? "Image" : "Document";
// Determine if it's added or updated
const action = data.isNew ? "added" : "updated";
// Construct the body string
const body = `An ${mediaType} has been ${action}.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.newMediaAdded",
body,
variables: {
mediaType,
action
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for new notes added to a job.
*/
const newNoteAddedBuilder = (data) => {
const body = `An Note has been added: "${data.data.text}"`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.newNoteAdded",
body,
variables: {
text: data.data.text
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for new time tickets posted.
*/
const newTimeTicketPostedBuilder = (data) => {
consoleDir(data);
const type = data?.data?.cost_center;
const body = `An ${type} time ticket has been posted${data?.data?.flat_rate ? " (Flat Rate)" : ""}.`.trim();
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.newTimeTicketPosted",
body,
variables: {
type
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for parts marked as back-ordered.
*/
const partMarkedBackOrderedBuilder = (data) => {
const body = `A part has been marked as back-ordered.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.partBackOrdered",
body,
variables: {
queuedForParts: data.changedFields.queued_for_parts?.new,
oldQueuedForParts: data.changedFields.queued_for_parts?.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for payment collection events.
*/
const paymentCollectedCompletedBuilder = (data) => {
const body = `Payment of $${data.data.clm_total} has been collected.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.paymentCollected",
body,
variables: {
clmTotal: data.data.clm_total
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for changes to scheduled dates.
*/
const scheduledDatesChangedBuilder = (data) => {
const body = `Scheduled dates have been updated.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.scheduledDatesChanged",
body,
variables: {
scheduledIn: data.changedFields.scheduled_in?.new,
oldScheduledIn: data.changedFields.scheduled_in?.old,
scheduledCompletion: data.changedFields.scheduled_completion?.new,
oldScheduledCompletion: data.changedFields.scheduled_completion?.old,
scheduledDelivery: data.changedFields.scheduled_delivery?.new,
oldScheduledDelivery: data.changedFields.scheduled_delivery?.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for supplement imported events.
*/
const supplementImportedBuilder = (data) => {
const body = `A supplement of $${data.data.cieca_ttl?.data?.supp_amt || 0} has been imported.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.supplementImported",
body,
variables: {
suppAmt: data.data.cieca_ttl?.data?.supp_amt
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for tasks updated or created.
*/
const tasksUpdatedCreatedBuilder = (data) => {
const body = `Tasks have been ${data.isNew ? "created" : "updated"}.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: data.isNew ? "notifications.job.taskCreated" : "notifications.job.taskUpdated",
body,
variables: {
isNew: data.isNew,
roNumber: data.jobRoNumber
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
module.exports = {
alternateTransportChangedBuilder,
billPostedHandler,
criticalPartsStatusChangedBuilder,
intakeDeliveryChecklistCompletedBuilder,
jobAssignedToMeBuilder,
jobsAddedToProductionBuilder,
jobStatusChangeBuilder,
newMediaAddedReassignedBuilder,
newNoteAddedBuilder,
newTimeTicketPostedBuilder,
partMarkedBackOrderedBuilder,
paymentCollectedCompletedBuilder,
scheduledDatesChangedBuilder,
supplementImportedBuilder,
tasksUpdatedCreatedBuilder
};

View File

@@ -0,0 +1,234 @@
const {
jobAssignedToMeBuilder,
billPostedHandler,
newNoteAddedBuilder,
scheduledDatesChangedBuilder,
tasksUpdatedCreatedBuilder,
jobStatusChangeBuilder,
jobsAddedToProductionBuilder,
alternateTransportChangedBuilder,
newTimeTicketPostedBuilder,
intakeDeliveryChecklistCompletedBuilder,
paymentCollectedCompletedBuilder,
newMediaAddedReassignedBuilder,
criticalPartsStatusChangedBuilder,
supplementImportedBuilder,
partMarkedBackOrderedBuilder
} = require("./scenarioBuilders");
/**
* An array of notification scenario definitions.
*
* Each scenario object can include the following properties:
* - key {string}: The unique scenario name.
* - table {string}: The table name to check for changes.
* - fields {Array<string>}: Fields to check for changes.
* - matchToUserFields {Array<string>}: Fields used to match scenarios to user data.
* - onNew {boolean|Array<boolean>}: Indicates whether the scenario should be triggered on new data.
* - onlyTrue {Array<string>}: Specifies fields that must be true for the scenario to match.
* - builder {Function}: A function to handle the scenario.
*/
const notificationScenarios = [
{
key: "job-assigned-to-me",
table: "jobs",
fields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
matchToUserFields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
builder: jobAssignedToMeBuilder
},
{
key: "bill-posted",
table: "bills",
builder: billPostedHandler,
onNew: true
},
{
key: "new-note-added",
table: "notes",
builder: newNoteAddedBuilder,
onNew: true
},
{
key: "schedule-dates-changed",
table: "jobs",
fields: ["scheduled_in", "scheduled_completion", "scheduled_delivery"],
builder: scheduledDatesChangedBuilder
},
{
key: "tasks-updated-created",
table: "tasks",
fields: ["updated_at"],
// onNew: true,
builder: tasksUpdatedCreatedBuilder
},
{
key: "job-status-change",
table: "jobs",
fields: ["status"],
builder: jobStatusChangeBuilder
},
{
key: "job-added-to-production",
table: "jobs",
fields: ["inproduction"],
onlyTruthyValues: ["inproduction"],
builder: jobsAddedToProductionBuilder
},
{
key: "alternate-transport-changed",
table: "jobs",
fields: ["alt_transport"],
builder: alternateTransportChangedBuilder
},
{
key: "new-time-ticket-posted",
table: "timetickets",
builder: newTimeTicketPostedBuilder
},
{
// Good test for batching as this will hit multiple scenarios
key: "intake-delivery-checklist-completed",
table: "jobs",
fields: ["intakechecklist", "deliverchecklist"],
builder: intakeDeliveryChecklistCompletedBuilder
},
{
key: "payment-added",
table: "payments",
onNew: true,
builder: paymentCollectedCompletedBuilder
},
{
// MAKE SURE YOU ARE NOT ON A LMS ENVIRONMENT
key: "new-media-added-reassigned",
table: "documents",
fields: ["jobid"],
builder: newMediaAddedReassignedBuilder
},
{
key: "critical-parts-status-changed",
table: "joblines",
fields: ["critical"],
onlyTrue: ["critical"],
builder: criticalPartsStatusChangedBuilder
},
// -------------- Difficult ---------------
// Holding off on this one for now
{
key: "supplement-imported",
builder: supplementImportedBuilder
// spans multiple tables,
},
// This one may be tricky as the jobid is not directly in the event data (this is probably wrong)
// (should otherwise)
// Status needs to mark meta data 'md_backorderd' for example
// Double check Jobid
{
key: "part-marked-back-ordered",
table: "joblines",
builder: partMarkedBackOrderedBuilder
}
];
/**
* Returns an array of scenarios that match the given event data.
*
* @param {Object} eventData - The parsed event data.
* Expected properties:
* - table: an object with a `name` property (e.g. { name: "tasks", schema: "public" })
* - changedFieldNames: an array of changed field names (e.g. [ "description", "updated_at" ])
* - isNew: boolean indicating whether the record is new or updated
* - data: the new data object (used to check field values)
* - (other properties may be added such as jobWatchers, bodyShopId, etc.)
*
* @returns {Array<Object>} An array of matching scenario objects.
*/
/**
* Returns an array of scenarios that match the given event data.
*
* @param {Object} eventData - The parsed event data.
* Expected properties:
* - table: an object with a `name` property (e.g. { name: "tasks", schema: "public" })
* - changedFieldNames: an array of changed field names (e.g. [ "description", "updated_at" ])
* - isNew: boolean indicating whether the record is new or updated
* - data: the new data object (used to check field values)
* - (other properties may be added such as jobWatchers, bodyShopId, etc.)
*
* @returns {Array<Object>} An array of matching scenario objects.
*/
const getMatchingScenarios = (eventData) =>
notificationScenarios.filter((scenario) => {
// If eventData has a table, then only scenarios with a table property that matches should be considered.
if (eventData.table) {
if (!scenario.table || eventData.table.name !== scenario.table) {
return false;
}
}
// Check the onNew flag.
// Allow onNew to be either a boolean or an array of booleans.
if (Object.prototype.hasOwnProperty.call(scenario, "onNew")) {
if (Array.isArray(scenario.onNew)) {
if (!scenario.onNew.includes(eventData.isNew)) return false;
} else {
if (eventData.isNew !== scenario.onNew) return false;
}
}
// If the scenario defines fields, ensure at least one of them is present in changedFieldNames.
if (scenario.fields && scenario.fields.length > 0) {
const hasMatchingField = scenario.fields.some((field) => eventData.changedFieldNames.includes(field));
if (!hasMatchingField) {
return false;
}
}
// OnlyTrue logic:
// If a scenario defines an onlyTrue array, then at least one of those fields must have changed
// and its new value (from eventData.data) must be non-falsey.
if (scenario.onlyTrue && Array.isArray(scenario.onlyTrue) && scenario.onlyTrue.length > 0) {
const hasTruthyChange = scenario.onlyTrue.some(
(field) => eventData.changedFieldNames.includes(field) && Boolean(eventData.data[field])
);
if (!hasTruthyChange) {
return false;
}
}
// OnlyTruthyValues logic:
// If onlyTruthyValues is defined, check that the new values of specified fields (or all changed fields if true)
// are truthy. If an array, only check the listed fields, which must be in scenario.fields.
if (Object.prototype.hasOwnProperty.call(scenario, "onlyTruthyValues")) {
let fieldsToCheck;
if (scenario.onlyTruthyValues === true) {
// If true, check all fields in the scenario that changed
fieldsToCheck = scenario.fields.filter((field) => eventData.changedFieldNames.includes(field));
} else if (Array.isArray(scenario.onlyTruthyValues) && scenario.onlyTruthyValues.length > 0) {
// If an array, check only the specified fields, ensuring they are in scenario.fields
fieldsToCheck = scenario.onlyTruthyValues.filter(
(field) => scenario.fields.includes(field) && eventData.changedFieldNames.includes(field)
);
// If no fields in onlyTruthyValues match the scenarios fields or changed fields, skip this scenario
if (fieldsToCheck.length === 0) {
return false;
}
} else {
// Invalid onlyTruthyValues (not true or a non-empty array), skip this scenario
return false;
}
// Ensure all fields to check have truthy new values
const allTruthy = fieldsToCheck.every((field) => Boolean(eventData.data[field]));
if (!allTruthy) {
return false;
}
}
return true;
});
module.exports = {
notificationScenarios,
getMatchingScenarios
};

View File

@@ -0,0 +1,292 @@
/**
* @module scenarioParser
* @description
* This module exports a function that parses an event and triggers notification scenarios based on the event data.
* It integrates with event parsing utilities, GraphQL queries, and notification queues to manage the dispatching
* of notifications via email and app channels. The function processes event data, identifies relevant scenarios,
* queries user notification preferences, and dispatches notifications accordingly.
*/
const eventParser = require("./eventParser");
const { client: gqlClient } = require("../graphql-client/graphql-client");
const queries = require("../graphql-client/queries");
const { isEmpty, isFunction } = require("lodash");
const { getMatchingScenarios } = require("./scenarioMapper");
const { dispatchEmailsToQueue } = require("./queues/emailQueue");
const { dispatchAppsToQueue } = require("./queues/appQueue");
// If true, the user who commits the action will NOT receive notifications; if false, they will.
const FILTER_SELF_FROM_WATCHERS = process.env?.FILTER_SELF_FROM_WATCHERS === "true";
/**
* Parses an event and determines matching scenarios for notifications.
* Queries job watchers and notification settings before triggering scenario builders.
*
* @param {Object} req - The request object containing event data, trigger, table, and logger.
* @param {string} jobIdField - The field path (e.g., "req.body.event.new.id") to extract the job ID.
* @returns {Promise<void>} Resolves when the parsing and notification dispatching process is complete.
* @throws {Error} If required request fields (event data, trigger, or table) or body shop data are missing.
*/
const scenarioParser = async (req, jobIdField) => {
const { event, trigger, table } = req.body;
const { logger } = req;
// Step 1: Validate we know what user committed the action that fired the parser
// console.log("Step 1");
const hasuraUserId = event?.session_variables?.["x-hasura-user-id"];
// Bail if we don't know who started the scenario
if (!hasuraUserId) {
logger.log("No Hasura user ID found, skipping notification parsing", "info", "notifications");
return;
}
// Validate that required fields are present in the request body
if (!event?.data || !trigger || !table) {
throw new Error("Missing required request fields: event data, trigger, or table.");
}
// Step 2: Extract just the jobId using the provided jobIdField
// console.log("Step 2");
let jobId = null;
if (jobIdField) {
let keyName = jobIdField;
const prefix = "req.body.event.new.";
if (keyName.startsWith(prefix)) {
keyName = keyName.slice(prefix.length);
}
jobId = event.data.new[keyName] || (event.data.old && event.data.old[keyName]) || null;
}
if (!jobId) {
logger.log(`No jobId found using path "${jobIdField}", skipping notification parsing`, "info", "notifications");
return;
}
// Step 3: Query job watchers associated with the job ID using GraphQL
// console.log("Step 3");
const watcherData = await gqlClient.request(queries.GET_JOB_WATCHERS, {
jobid: jobId
});
// Transform watcher data into a simplified format with email and employee details
let jobWatchers = watcherData?.job_watchers?.map((watcher) => ({
email: watcher.user_email,
firstName: watcher?.user?.employee?.first_name,
lastName: watcher?.user?.employee?.last_name,
employeeId: watcher?.user?.employee?.id,
authId: watcher?.user?.authid
}));
if (FILTER_SELF_FROM_WATCHERS) {
jobWatchers = jobWatchers.filter((watcher) => watcher.authId !== hasuraUserId);
}
// Exit early if no job watchers are found for this job
if (isEmpty(jobWatchers)) {
logger.log(`No watchers found for jobId "${jobId}", skipping notification parsing`, "info", "notifications");
return;
}
// Step 5: Perform the full event diff now that we know there are watchers
// console.log("Step 5");
const eventData = await eventParser({
newData: event.data.new,
oldData: event.data.old,
trigger,
table,
jobId
});
// Step 6: Extract body shop information from the job data
// console.log("Step 6");
const bodyShopId = watcherData?.job?.bodyshop?.id;
const bodyShopName = watcherData?.job?.bodyshop?.shopname;
const jobRoNumber = watcherData?.job?.ro_number;
const jobClaimNumber = watcherData?.job?.clm_no;
// Validate that body shop data exists, as its required for notifications
if (!bodyShopId || !bodyShopName) {
throw new Error("No bodyshop data found for this job.");
}
// Step 7: Identify scenarios that match the event data and job context
// console.log("Step 7");
const matchingScenarios = getMatchingScenarios({
...eventData,
jobWatchers,
bodyShopId,
bodyShopName
});
// Exit early if no matching scenarios are identified
if (isEmpty(matchingScenarios)) {
logger.log(
`No matching scenarios found for jobId "${jobId}", skipping notification dispatch`,
"info",
"notifications"
);
return;
}
// Combine event data with additional context for scenario processing
const finalScenarioData = {
...eventData,
jobWatchers,
bodyShopId,
bodyShopName,
matchingScenarios
};
// Step 8: Query notification settings for the job watchers
// console.log("Step 8");
const associationsData = await gqlClient.request(queries.GET_NOTIFICATION_ASSOCIATIONS, {
emails: jobWatchers.map((x) => x.email),
shopid: bodyShopId
});
// Exit early if no notification associations are found
if (isEmpty(associationsData?.associations)) {
logger.log(
`No notification associations found for jobId "${jobId}", skipping notification dispatch`,
"info",
"notifications"
);
return;
}
// Step 9: Filter scenario watchers based on their enabled notification methods
// console.log("Step 9");
finalScenarioData.matchingScenarios = finalScenarioData.matchingScenarios.map((scenario) => ({
...scenario,
scenarioWatchers: associationsData.associations
.filter((assoc) => {
const settings = assoc.notification_settings && assoc.notification_settings[scenario.key];
// Include only watchers with at least one enabled notification method (app, email, or FCM)
return settings && (settings.app || settings.email || settings.fcm);
})
.map((assoc) => {
const settings = assoc.notification_settings[scenario.key];
const watcherEmail = assoc.useremail;
const matchingWatcher = jobWatchers.find((watcher) => watcher.email === watcherEmail);
// Build watcher object with notification preferences and personal details
return {
user: watcherEmail,
email: settings.email,
app: settings.app,
fcm: settings.fcm,
firstName: matchingWatcher?.firstName,
lastName: matchingWatcher?.lastName,
employeeId: matchingWatcher?.employeeId,
associationId: assoc.id
};
})
}));
// Exit early if no scenarios have eligible watchers after filtering
if (isEmpty(finalScenarioData?.matchingScenarios)) {
logger.log(
`No eligible watchers after filtering for jobId "${jobId}", skipping notification dispatch`,
"info",
"notifications"
);
return;
}
// Step 10: Build and collect scenarios to dispatch notifications for
// console.log("Step 10");
const scenariosToDispatch = [];
for (const scenario of finalScenarioData.matchingScenarios) {
// Skip if no watchers or no builder function is defined for the scenario
if (isEmpty(scenario.scenarioWatchers) || !isFunction(scenario.builder)) {
continue;
}
let eligibleWatchers = scenario.scenarioWatchers;
// Filter watchers to only those assigned to changed fields, if specified
if (scenario.matchToUserFields && scenario.matchToUserFields.length > 0) {
eligibleWatchers = scenario.scenarioWatchers.filter((watcher) =>
scenario.matchToUserFields.some(
(field) => eventData.changedFieldNames.includes(field) && eventData.data[field]?.includes(watcher.employeeId)
)
);
}
// Skip if no watchers remain after filtering
if (isEmpty(eligibleWatchers)) {
continue;
}
// Step 11: Filter scenario fields to include only those that changed
// console.log("Step 11");
const filteredScenarioFields =
scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || [];
// Use the scenarios builder to construct the notification data
scenariosToDispatch.push(
scenario.builder({
trigger: finalScenarioData.trigger.name,
bodyShopId: finalScenarioData.bodyShopId,
bodyShopName: finalScenarioData.bodyShopName,
scenarioKey: scenario.key,
scenarioTable: scenario.table,
scenarioFields: filteredScenarioFields,
scenarioBuilder: scenario.builder,
scenarioWatchers: eligibleWatchers,
jobId: finalScenarioData.jobId,
jobRoNumber: jobRoNumber,
jobClaimNumber: jobClaimNumber,
isNew: finalScenarioData.isNew,
changedFieldNames: finalScenarioData.changedFieldNames,
changedFields: finalScenarioData.changedFields,
data: finalScenarioData.data
})
);
}
if (isEmpty(scenariosToDispatch)) {
logger.log(`No scenarios to dispatch for jobId "${jobId}" after building`, "info", "notifications");
return;
}
// Step 12: Dispatch email notifications to the email queue
// console.log("Step 12");
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email);
if (!isEmpty(emailsToDispatch)) {
dispatchEmailsToQueue({ emailsToDispatch, logger }).catch((e) =>
logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, {
message: e?.message,
stack: e?.stack
})
);
}
// Step 13: Dispatch app notifications to the app queue
// console.log("Step 13");
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
if (!isEmpty(appsToDispatch)) {
dispatchAppsToQueue({ appsToDispatch, logger }).catch((e) =>
logger.log("Something went wrong dispatching apps to the App Notification Queue", "error", "queue", null, {
message: e?.message,
stack: e?.stack
})
);
}
};
module.exports = scenarioParser;

View File

@@ -0,0 +1,31 @@
/**
* @module jobAssignmentHelper
* @description
* This module provides utility functions for handling job assignment types.
* Currently, it includes a function to map lowercase job assignment codes to their corresponding human-readable job types.
*/
/**
* Maps a lowercase job assignment code to its corresponding human-readable job type.
*
* @param {string} data - The lowercase job assignment code (e.g., "employee_pre").
* @returns {string} The human-readable job type (e.g., "Prep"). Returns an empty string if the code is unknown or if the input is null/undefined.
*/
const getJobAssignmentType = (data) => {
switch (data) {
case "employee_pre":
return "Prep";
case "employee_body":
return "Body";
case "employee_csr":
return "CSR";
case "employee_refinish":
return "Refinish";
default:
return "";
}
};
module.exports = {
getJobAssignmentType
};

View File

@@ -2,13 +2,18 @@ const express = require("express");
const validateFirebaseIdTokenMiddleware = require("../middleware/validateFirebaseIdTokenMiddleware");
const { subscribe, unsubscribe, sendNotification } = require("../firebase/firebase-handler");
const eventAuthorizationMiddleware = require("../middleware/eventAuthorizationMIddleware");
const handlePartsOrderChange = require("../notifications/eventHandlers/handlePartsOrderChange");
const handlePartsDispatchChange = require("../notifications/eventHandlers/handlePartsDispatchChange");
const handleTasksChange = require("../notifications/eventHandlers/handleTasksChange");
const handleTimeTicketsChange = require("../notifications/eventHandlers/handleTimeTicketsChange");
const handleJobsChange = require("../notifications/eventHandlers/handeJobsChange");
const handleBillsChange = require("../notifications/eventHandlers/handleBillsChange");
const {
handleJobsChange,
handleBillsChange,
handlePartsOrderChange,
handlePartsDispatchChange,
handleTasksChange,
handleTimeTicketsChange,
handleNotesChange,
handlePaymentsChange,
handleDocumentsChange,
handleJobLinesChange
} = require("../notifications/eventHandlers");
const router = express.Router();
@@ -24,5 +29,9 @@ router.post("/events/handlePartsOrderChange", eventAuthorizationMiddleware, hand
router.post("/events/handlePartsDispatchChange", eventAuthorizationMiddleware, handlePartsDispatchChange);
router.post("/events/handleTasksChange", eventAuthorizationMiddleware, handleTasksChange);
router.post("/events/handleTimeTicketsChange", eventAuthorizationMiddleware, handleTimeTicketsChange);
router.post("/events/handleNotesChange", eventAuthorizationMiddleware, handleNotesChange);
router.post("/events/handlePaymentsChange", eventAuthorizationMiddleware, handlePaymentsChange);
router.post("/events/handleDocumentsChange", eventAuthorizationMiddleware, handleDocumentsChange);
router.post("/events/handleJobLinesChange", eventAuthorizationMiddleware, handleJobLinesChange);
module.exports = router;

View File

@@ -0,0 +1,52 @@
// server/utils/cleanupManager.js
const logger = require("./logger");
let cleanupTasks = [];
let isShuttingDown = false;
/**
* Register a cleanup task to be executed during shutdown
* @param {Function} task - The cleanup task to register
*/
function registerCleanupTask(task) {
cleanupTasks.push(task);
}
/**
* Handle SIGTERM signal for graceful shutdown
*/
async function handleSigterm() {
if (isShuttingDown) {
logger.log("sigterm-api", "WARN", null, null, { message: "Shutdown already in progress, ignoring signal." });
return;
}
isShuttingDown = true;
logger.log("sigterm-api", "WARN", null, null, { message: "SIGTERM Received. Starting graceful shutdown." });
try {
for (const task of cleanupTasks) {
logger.log("sigterm-api", "WARN", null, null, { message: `Running cleanup task: ${task.name}` });
await task();
}
logger.log("sigterm-api", "WARN", null, null, { message: `All cleanup tasks completed.` });
} catch (error) {
logger.log("sigterm-api-error", "ERROR", null, null, { message: error.message, stack: error.stack });
}
process.exit(0);
}
/**
* Initialize cleanup manager with process event listeners
*/
function initializeCleanupManager() {
process.on("SIGTERM", handleSigterm);
process.on("SIGINT", handleSigterm); // Handle Ctrl+C
}
module.exports = {
registerCleanupTask,
initializeCleanupManager
};

View File

@@ -0,0 +1,7 @@
const { inspect } = require("node:util");
const consoleDir = (data) => {
console.log(inspect(data, { showHidden: false, depth: null, colors: true }));
};
module.exports = consoleDir;

View File

@@ -58,4 +58,20 @@ exports.InstanceRegion = () =>
rome: "us-east-2"
});
exports.InstanceEndpoints = () =>
InstanceManager({
imex:
process.env?.NODE_ENV === "development"
? "https://localhost:3000"
: process.env?.NODE_ENV === "test"
? "https://test.imex.online"
: "https://imex.online",
rome:
process.env?.NODE_ENV === "development"
? "https://localhost:3000"
: process.env?.NODE_ENV === "test"
? "https://test.romeonline.io"
: "https://romeonline.io"
});
exports.default = InstanceManager;

View File

@@ -73,7 +73,20 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
try {
await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
logger.log(`Error adding item to the end of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
let userEmail = "unknown";
let socketMappings = {};
try {
const userData = await getSessionData(socketId, "user");
if (userData && userData.email) {
userEmail = userData.email;
socketMappings = await getUserSocketMapping(userEmail);
}
} catch (sessionError) {
logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
}
const mappingString = JSON.stringify(socketMappings, null, 2);
const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
logger.log(errorMessage, "ERROR", "redis");
}
};
@@ -121,6 +134,106 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
// Save the mapping: socketId -> bodyshopId
await pubClient.hset(socketMappingKey, socketId, bodyshopId);
// Set TTL (24 hours) for the mapping hash
await pubClient.expire(socketMappingKey, 86400);
} catch (error) {
logger.log(`Error adding socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
}
};
const refreshUserSocketTTL = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
const exists = await pubClient.exists(socketMappingKey);
if (exists) {
await pubClient.expire(socketMappingKey, 86400);
logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis");
}
} catch (error) {
logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis");
}
};
const removeUserSocketMapping = async (email, socketId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis");
// Look up the bodyshopId associated with this socket
const bodyshopId = await pubClient.hget(socketMappingKey, socketId);
if (!bodyshopId) {
logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis");
return;
}
// Remove the socket mapping
await pubClient.hdel(socketMappingKey, socketId);
logger.log(
`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`,
"DEBUG",
"redis"
);
// Refresh TTL if any socket mappings remain
const remainingSockets = await pubClient.hlen(socketMappingKey);
if (remainingSockets > 0) {
await pubClient.expire(socketMappingKey, 86400);
}
} catch (error) {
logger.log(`Error removing socket mapping for ${email}: ${error}`, "ERROR", "redis");
}
};
const getUserSocketMapping = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
const ttl = await pubClient.ttl(socketMappingKey);
// Group socket IDs by bodyshopId
const result = {};
for (const [socketId, bodyshopId] of Object.entries(mapping)) {
if (!result[bodyshopId]) {
result[bodyshopId] = { socketIds: [], ttl };
}
result[bodyshopId].socketIds.push(socketId);
}
return result;
} catch (error) {
console.error(`Error retrieving socket mappings for ${email}:`, error);
throw error;
}
};
const getUserSocketMappingByBodyshop = async (email, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
const ttl = await pubClient.ttl(socketMappingKey);
// Filter socket IDs for the provided bodyshopId
const socketIds = Object.entries(mapping).reduce((acc, [socketId, bId]) => {
if (bId === bodyshopId) {
acc.push(socketId);
}
return acc;
}, []);
return { socketIds, ttl };
} catch (error) {
logger.log(`Error retrieving socket mappings for ${email} by bodyshop ${bodyshopId}: ${error}`, "ERROR", "redis");
throw error;
}
};
const api = {
setSessionData,
getSessionData,
@@ -133,7 +246,12 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
clearList,
addUserToRoom,
removeUserFromRoom,
getUsersInRoom
getUsersInRoom,
addUserSocketMapping,
removeUserSocketMapping,
getUserSocketMappingByBodyshop,
getUserSocketMapping,
refreshUserSocketTTL
};
Object.assign(module.exports, api);
@@ -143,86 +261,6 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
next();
});
// Demo to show how all the helper functions work
// const demoSessionData = async () => {
// const socketId = "testSocketId";
//
// // 1. Test setSessionData and getSessionData
// await setSessionData(socketId, "field1", "Hello, Redis!");
// const field1Value = await getSessionData(socketId, "field1");
// console.log("Retrieved single field value:", field1Value);
//
// // 2. Test setMultipleSessionData and getMultipleSessionData
// await setMultipleSessionData(socketId, { field2: "Second Value", field3: "Third Value" });
// const multipleFields = await getMultipleSessionData(socketId, ["field2", "field3"]);
// console.log("Retrieved multiple field values:", multipleFields);
//
// // 3. Test setMultipleFromArraySessionData
// await setMultipleFromArraySessionData(socketId, [
// ["field4", "Fourth Value"],
// ["field5", "Fifth Value"]
// ]);
//
// // Retrieve and log all fields
// const allFields = await getMultipleSessionData(socketId, ["field1", "field2", "field3", "field4", "field5"]);
// console.log("Retrieved all field values:", allFields);
//
// // 4. Test list functions
// // Add item to the end of a Redis list
// await addItemToEndOfList(socketId, "logEvents", { event: "Log Event 1", timestamp: new Date() });
// await addItemToEndOfList(socketId, "logEvents", { event: "Log Event 2", timestamp: new Date() });
//
// // Add item to the beginning of a Redis list
// await addItemToBeginningOfList(socketId, "logEvents", { event: "First Log Event", timestamp: new Date() });
//
// // Retrieve the entire list
// const logEventsData = await pubClient.lrange(`socket:${socketId}:logEvents`, 0, -1);
// const logEvents = logEventsData.map((item) => JSON.parse(item));
// console.log("Log Events List:", logEvents);
//
// // 5. Test clearList
// await clearList(socketId, "logEvents");
// console.log("Log Events List cleared.");
//
// // Retrieve the list after clearing to confirm it's empty
// const logEventsAfterClear = await pubClient.lrange(`socket:${socketId}:logEvents`, 0, -1);
// console.log("Log Events List after clearing:", logEventsAfterClear); // Should be an empty array
//
// // 6. Test clearSessionData
// await clearSessionData(socketId);
// console.log("Session data cleared.");
//
// // 7. Test room functions
// const roomName = "testRoom";
// const user1 = { id: 1, name: "Alice" };
// const user2 = { id: 2, name: "Bob" };
//
// // Add users to room
// await addUserToRoom(roomName, user1);
// await addUserToRoom(roomName, user2);
//
// // Get users in room
// const usersInRoom = await getUsersInRoom(roomName);
// console.log(`Users in room ${roomName}:`, usersInRoom);
//
// // Remove a user from room
// await removeUserFromRoom(roomName, user1);
//
// // Get users in room after removal
// const usersInRoomAfterRemoval = await getUsersInRoom(roomName);
// console.log(`Users in room ${roomName} after removal:`, usersInRoomAfterRemoval);
//
// // Clean up: remove remaining users from room
// await removeUserFromRoom(roomName, user2);
//
// // Verify room is empty
// const usersInRoomAfterCleanup = await getUsersInRoom(roomName);
// console.log(`Users in room ${roomName} after cleanup:`, usersInRoomAfterCleanup); // Should be empty
// };
// if (process.env.NODE_ENV === "development") {
// demoSessionData();
// }
return api;
};

View File

@@ -2,7 +2,7 @@ const { admin } = require("../firebase/firebase-handler");
const redisSocketEvents = ({
io,
redisHelpers: { setSessionData, clearSessionData }, // Note: Used if we persist user to Redis
redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL, getUserSocketMappingByBodyshop },
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
logger
}) => {
@@ -12,78 +12,64 @@ const redisSocketEvents = ({
};
// Socket Auth Middleware
const authMiddleware = (socket, next) => {
const authMiddleware = async (socket, next) => {
const { token, bodyshopId } = socket.handshake.auth;
if (!token) {
return next(new Error("Authentication error - no authorization token."));
}
if (!bodyshopId) {
return next(new Error("Authentication error - no bodyshopId provided."));
}
try {
if (socket.handshake.auth.token) {
admin
.auth()
.verifyIdToken(socket.handshake.auth.token)
.then((user) => {
socket.user = user;
// Note: if we ever want to capture user data across sockets
// Uncomment the following line and then remove the next() to a second then()
// return setSessionData(socket.id, "user", user);
next();
})
.catch((error) => {
next(new Error(`Authentication error: ${error.message}`));
});
} else {
next(new Error("Authentication error - no authorization token."));
}
const user = await admin.auth().verifyIdToken(token);
socket.user = user;
await addUserSocketMapping(user.email, socket.id, bodyshopId);
next();
} catch (error) {
logger.log("websocket-connection-error", "error", null, null, {
...error
});
next(new Error(`Authentication error ${error}`));
next(new Error(`Authentication error: ${error.message}`));
}
};
// Register Socket Events
const registerSocketEvents = (socket) => {
// Uncomment for further testing
// createLogEvent(socket, "debug", `Registering RedisIO Socket Events.`);
// Token Update Events
const registerUpdateEvents = (socket) => {
let latestTokenTimestamp = 0;
const updateToken = async (newToken) => {
const updateToken = async ({ token, bodyshopId }) => {
const currentTimestamp = Date.now();
latestTokenTimestamp = currentTimestamp;
try {
// Verify token with Firebase Admin SDK
const user = await admin.auth().verifyIdToken(newToken, true);
if (!token || !bodyshopId) {
socket.emit("token-updated", { success: false, error: "Token or bodyshopId missing" });
return;
}
// Skip outdated token validations
try {
const user = await admin.auth().verifyIdToken(token, true);
if (currentTimestamp < latestTokenTimestamp) {
createLogEvent(socket, "warn", "Outdated token validation skipped.");
return;
}
socket.user = user;
createLogEvent(socket, "debug", `Token updated successfully for socket ID: ${socket.id}`);
await refreshUserSocketTTL(user.email, bodyshopId);
createLogEvent(
socket,
"debug",
`Token updated successfully for socket ID: ${socket.id} (bodyshop: ${bodyshopId})`
);
socket.emit("token-updated", { success: true });
} catch (error) {
if (error.code === "auth/id-token-expired") {
createLogEvent(socket, "warn", "Stale token received, waiting for new token");
socket.emit("token-updated", {
success: false,
error: "Stale token."
});
return; // Avoid disconnecting for expired tokens
socket.emit("token-updated", { success: false, error: "Stale token." });
return;
}
createLogEvent(socket, "error", `Token update failed for socket ID: ${socket.id}, Error: ${error.message}`);
socket.emit("token-updated", { success: false, error: error.message });
// Optionally disconnect for invalid tokens or other errors
socket.disconnect();
}
};
socket.on("update-token", updateToken);
};
@@ -127,16 +113,15 @@ const redisSocketEvents = ({
// Disconnect Events
const registerDisconnectEvents = (socket) => {
const disconnect = () => {
// Uncomment for further testing
// createLogEvent(socket, "debug", `User disconnected.`);
const disconnect = async () => {
if (socket.user?.email) {
await removeUserSocketMapping(socket.user.email, socket.id);
}
// Leave all rooms except the default room (socket.id)
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
for (const room of rooms) {
socket.leave(room);
}
// If we ever want to persist the user across workers
// clearSessionData(socket.id);
};
socket.on("disconnect", disconnect);
@@ -157,6 +142,7 @@ const redisSocketEvents = ({
});
}
};
const leaveConversationRoom = ({ bodyshopId, conversationId }) => {
try {
const room = getBodyshopConversationRoom({ bodyshopId, conversationId });
@@ -196,11 +182,58 @@ const redisSocketEvents = ({
socket.on("leave-bodyshop-conversation", leaveConversationRoom);
};
// Sync Notification Read Events
const registerSyncEvents = (socket) => {
socket.on("sync-notification-read", async ({ email, bodyshopId, notificationId }) => {
try {
const userEmail = socket.user.email;
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-notification-read", { notificationId, timestamp });
}
});
createLogEvent(
socket,
"debug",
`Synced notification ${notificationId} read for ${userEmail} in bodyshop ${bodyshopId}`
);
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing notification read: ${error.message}`);
}
});
socket.on("sync-all-notifications-read", async ({ email, bodyshopId }) => {
try {
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-all-notifications-read", { timestamp });
}
});
createLogEvent(socket, "debug", `Synced all notifications read for ${email} in bodyshop ${bodyshopId}`);
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing all notifications read: ${error.message}`);
}
});
};
// Call Handlers
registerRoomAndBroadcastEvents(socket);
registerUpdateEvents(socket);
registerMessagingEvents(socket);
registerDisconnectEvents(socket);
registerSyncEvents(socket);
};
// Associate Middleware and Handlers