feature/IO-3092-imgproxy - Merge release to take care of PR conflicts.

This commit is contained in:
Dave Richer
2025-03-25 14:57:17 -04:00
184 changed files with 19851 additions and 19376 deletions

View File

@@ -12,7 +12,7 @@ const AxiosLib = require("axios").default;
const axios = AxiosLib.create();
const { PBS_ENDPOINTS, PBS_CREDENTIALS } = require("./pbs-constants");
const { CheckForErrors } = require("./pbs-job-export");
const uuid = require("uuid").v4;
axios.interceptors.request.use((x) => {
const socket = x.socket;
@@ -21,6 +21,7 @@ axios.interceptors.request.use((x) => {
...x.headers[x.method],
...x.headers
};
const printable = `${new Date()} | Request: ${x.method.toUpperCase()} | ${
x.url
} | ${JSON.stringify(x.data)} | ${JSON.stringify(headers)}`;

View File

@@ -7,7 +7,7 @@ const OAuthClient = require("intuit-oauth");
const client = require("../../graphql-client/graphql-client").client;
const queries = require("../../graphql-client/queries");
const { parse, stringify } = require("querystring");
const InstanceManager = require("../../utils/instanceMgr").default;
const { InstanceEndpoints } = require("../../utils/instanceMgr");
const oauthClient = new OAuthClient({
clientId: process.env.QBO_CLIENT_ID,
@@ -17,16 +17,8 @@ const oauthClient = new OAuthClient({
logging: true
});
let url;
if (process.env.NODE_ENV === "production") {
//TODO:AIO Add in QBO callbacks.
url = InstanceManager({ imex: `https://imex.online`, rome: `https://romeonline.io` });
} else if (process.env.NODE_ENV === "test") {
url = InstanceManager({ imex: `https://test.imex.online`, rome: `https://test.romeonline.io` });
} else {
url = `http://localhost:3000`;
}
//TODO:AIO Add in QBO callbacks.
const url = InstanceEndpoints();
exports.default = async (req, res) => {
const queryString = req.url.split("?").reverse()[0];

View File

@@ -18,7 +18,7 @@ const entegralEndpoint =
: "https://uat-ws.armsbusinesssolutions.net/RepairOrderFolderService/RepairOrderFolderService.asmx?WSDL";
const client = require("../graphql-client/graphql-client").client;
const uuid = require("uuid").v4;
const { v4 } = require("uuid");
const momentFormat = "yyyy-MM-DDTHH:mm:ss.SSS";
@@ -79,7 +79,7 @@ exports.default = async (req, res) => {
}
try {
const transId = uuid(); // Can this actually be the job id?
const transId = v4(); // Can this actually be the job id?
let obj = {
RqUID: transId,
DocumentInfo: {

View File

@@ -20,6 +20,11 @@ const defaultFooter = () => {
const now = () => moment().format("MM/DD/YYYY @ hh:mm a");
/**
* Generate the email template
* @param strings
* @returns {string}
*/
const generateEmailTemplate = (strings) => {
return (
`

View File

@@ -69,11 +69,14 @@ const sendServerEmail = async ({ subject, text }) => {
}
},
(err, info) => {
logger.log("server-email-failure", err ? "error" : "debug", null, null, { message: err || info });
logger.log("server-email-failure", err ? "error" : "debug", null, null, {
message: err?.message,
stack: err?.stack
});
}
);
} catch (error) {
logger.log("server-email-failure", "error", null, null, { error });
logger.log("server-email-failure", "error", null, null, { message: error?.message, stack: error?.stack });
}
};
@@ -92,11 +95,11 @@ const sendTaskEmail = async ({ to, subject, type = "text", html, text, attachmen
},
(err, info) => {
// (message, type, user, record, meta
logger.log("server-email", err ? "error" : "debug", null, null, { message: err ? err?.message : info });
logger.log("server-email", err ? "error" : "debug", null, null, { message: err?.message, stack: err?.stack });
}
);
} catch (error) {
logger.log("server-email-failure", "error", null, null, { error });
logger.log("server-email-failure", "error", null, null, { message: error?.message, stack: error?.stack });
}
};
@@ -125,7 +128,8 @@ const sendEmail = async (req, res) => {
cc: req.body.cc,
subject: req.body.subject,
templateStrings: req.body.templateStrings,
error
errorMessage: error?.message,
errorStack: error?.stack
});
}
})
@@ -194,7 +198,8 @@ const sendEmail = async (req, res) => {
cc: req.body.cc,
subject: req.body.subject,
templateStrings: req.body.templateStrings,
error: err
errorMessage: err?.message,
errorStack: err?.stack
});
logEmail(req, {
to: req.body.to,
@@ -202,7 +207,7 @@ const sendEmail = async (req, res) => {
subject: req.body.subject,
bodyshopid: req.body.bodyshopid
});
res.status(500).json({ success: false, error: err });
res.status(500).json({ success: false, errorMessage: err?.message, stack: err?.stack });
}
}
);
@@ -270,14 +275,16 @@ ${body.bounce?.bouncedRecipients.map(
},
(err, info) => {
logger.log("sns-error", err ? "error" : "debug", "api", null, {
message: err ? err?.message : info
errorMessage: err?.message,
errorStack: err?.stack
});
}
);
}
} catch (error) {
logger.log("sns-error", "ERROR", "api", null, {
error: JSON.stringify(error)
errorMessage: error?.message,
errorStack: error?.stack
});
}
res.sendStatus(200);

View File

@@ -10,6 +10,8 @@ const generateEmailTemplate = require("./generateTemplate");
const moment = require("moment-timezone");
const { taskEmailQueue } = require("./tasksEmailsQueue");
const mailer = require("./mailer");
const { InstanceEndpoints } = require("../utils/instanceMgr");
const { formatTaskPriority } = require("../notifications/stringHelpers");
// Initialize the Tasks Email Queue
const tasksEmailQueue = taskEmailQueue();
@@ -61,16 +63,6 @@ const formatDate = (date) => {
return date ? `| Due on: ${moment(date).format("MM/DD/YYYY")}` : "";
};
const formatPriority = (priority) => {
if (priority === 1) {
return "High";
} else if (priority === 3) {
return "Low";
} else {
return "Medium";
}
};
/**
* Generate the email template arguments.
* @param title
@@ -83,18 +75,11 @@ const formatPriority = (priority) => {
* @param taskId
* @returns {{header, body: string, subHeader: string}}
*/
const getEndpoints = (bodyshop) =>
InstanceManager({
imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online",
rome: process.env?.NODE_ENV === "test" ? "https//test.romeonline.io" : "https://romeonline.io"
});
const generateTemplateArgs = (title, priority, description, dueDate, bodyshop, job, taskId, dateLine, createdBy) => {
const endPoints = getEndpoints(bodyshop);
const endPoints = InstanceEndpoints();
return {
header: title,
subHeader: `Body Shop: ${bodyshop.shopname} | Priority: ${formatPriority(priority)} ${formatDate(dueDate)} | Created By: ${createdBy || "N/A"}`,
subHeader: `Body Shop: ${bodyshop.shopname} | Priority: ${formatTaskPriority(priority)} ${formatDate(dueDate)} | Created By: ${createdBy || "N/A"}`,
body: `Reference: ${job.ro_number || "N/A"} | ${job.ownr_co_nm ? job.ownr_co_nm : `${job.ownr_fn || ""} ${job.ownr_ln || ""}`.trim()} | ${`${job.v_model_yr || ""} ${job.v_make_desc || ""} ${job.v_model_desc || ""}`.trim()}<br>${description ? description.concat("<br>") : ""}<a href="${endPoints}/manage/tasks/alltasks?taskid=${taskId}">View this task.</a>`,
dateLine
};
@@ -108,9 +93,8 @@ const generateTemplateArgs = (title, priority, description, dueDate, bodyshop, j
* @param html
* @param taskIds
* @param successCallback
* @param requestInstance
*/
const sendMail = (type, to, subject, html, taskIds, successCallback, requestInstance) => {
const sendMail = (type, to, subject, html, taskIds, successCallback) => {
const fromEmails = InstanceManager({
imex: "ImEX Online <noreply@imex.online>",
rome: "Rome Online <noreply@romeonline.io>"
@@ -136,7 +120,7 @@ const sendMail = (type, to, subject, html, taskIds, successCallback, requestInst
};
/**
* Send an email to the assigned user.
* Email the assigned user.
* @param req
* @param res
* @returns {Promise<*>}
@@ -162,7 +146,7 @@ const taskAssignedEmail = async (req, res) => {
sendMail(
"assigned",
tasks_by_pk.assigned_to_employee.user_email,
`A ${formatPriority(newTask.priority)} priority task has been ${dirty ? "reassigned to" : "created for"} you - ${newTask.title}`,
`A ${formatTaskPriority(newTask.priority)} priority task has been ${dirty ? "reassigned to" : "created for"} you - ${newTask.title}`,
generateEmailTemplate(
generateTemplateArgs(
newTask.title,
@@ -186,7 +170,7 @@ const taskAssignedEmail = async (req, res) => {
};
/**
* Send an email to remind the user of their tasks.
* Email remind the user of their tasks.
* @param req
* @param res
* @returns {Promise<*>}
@@ -246,7 +230,7 @@ const tasksRemindEmail = async (req, res) => {
const onlyTask = groupedTasks[recipient.email][0];
emailData.subject =
`New ${formatPriority(onlyTask.priority)} Priority Task Reminder - ${onlyTask.title} ${onlyTask.due_date ? `- ${formatDate(onlyTask.due_date)}` : ""}`.trim();
`New ${formatTaskPriority(onlyTask.priority)} Priority Task Reminder - ${onlyTask.title} ${onlyTask.due_date ? `- ${formatDate(onlyTask.due_date)}` : ""}`.trim();
emailData.html = generateEmailTemplate(
generateTemplateArgs(
@@ -264,11 +248,6 @@ const tasksRemindEmail = async (req, res) => {
}
// There are multiple emails to send to this author.
else {
const endPoints = InstanceManager({
imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online",
rome: process.env?.NODE_ENV === "test" ? "https//test.romeonline.io" : "https://romeonline.io"
});
const allTasks = groupedTasks[recipient.email];
emailData.subject = `New Tasks Reminder - ${allTasks.length} Tasks require your attention`;
emailData.html = generateEmailTemplate({
@@ -278,7 +257,7 @@ const tasksRemindEmail = async (req, res) => {
body: `<ul>
${allTasks
.map((task) =>
`<li><a href="${endPoints}/manage/tasks/alltasks?taskid=${task.id}">${task.title} - Priority: ${formatPriority(task.priority)} ${task.due_date ? `${formatDate(task.due_date)}` : ""} | Bodyshop: ${task.bodyshop.shopname}</a></li>`.trim()
`<li><a href="${InstanceEndpoints()}/manage/tasks/alltasks?taskid=${task.id}">${task.title} - Priority: ${formatTaskPriority(task.priority)} ${task.due_date ? `${formatDate(task.due_date)}` : ""} | Bodyshop: ${task.bodyshop.shopname}</a></li>`.trim()
)
.join("")}
</ul>`
@@ -338,6 +317,5 @@ const tasksRemindEmail = async (req, res) => {
module.exports = {
taskAssignedEmail,
tasksRemindEmail,
getEndpoints
tasksRemindEmail
};

View File

@@ -1485,6 +1485,8 @@ exports.GET_JOB_BY_PK = `query GET_JOB_BY_PK($id: uuid!) {
materials
auto_add_ats
rate_ats
flat_rate_ats
rate_ats_flat
joblines(where: { removed: { _eq: false } }){
id
line_no
@@ -2706,6 +2708,70 @@ exports.INSERT_AUDIT_TRAIL = `
}
`;
exports.GET_JOB_WATCHERS = `
query GET_JOB_WATCHERS($jobid: uuid!) {
job_watchers(where: { jobid: { _eq: $jobid } }) {
user_email
user {
authid
employee {
id
first_name
last_name
}
}
}
job: jobs_by_pk(id: $jobid) {
id
ro_number
clm_no
bodyshop {
id
shopname
timezone
}
}
}
`;
exports.GET_NOTIFICATION_ASSOCIATIONS = `
query GET_NOTIFICATION_ASSOCIATIONS($emails: [String!]!, $shopid: uuid!) {
associations(where: {
useremail: { _in: $emails },
shopid: { _eq: $shopid }
}) {
id
useremail
notification_settings
}
}
`;
exports.INSERT_NOTIFICATIONS_MUTATION = ` mutation INSERT_NOTIFICATIONS($objects: [notifications_insert_input!]!) {
insert_notifications(objects: $objects) {
affected_rows
returning {
id
jobid
associationid
scenario_text
fcm_text
scenario_meta
}
}
}`;
// REMEMBER: Update the cache_bodyshop event in hasura to include any added fields
exports.GET_BODYSHOP_BY_ID = `
query GET_BODYSHOP_BY_ID($id: uuid!) {
bodyshops_by_pk(id: $id) {
id
md_order_statuses
shopname
}
}
`;
exports.GET_DOCUMENTS_BY_JOB = `
query GET_DOCUMENTS_BY_JOB($jobId: uuid!) {
jobs_by_pk(id: $jobId) {
@@ -2762,6 +2828,4 @@ exports.GET_DOCUMENTS_BY_IDS = `
size
takenat
}
}
`;
}`;

View File

@@ -10,12 +10,11 @@ const moment = require("moment");
const logger = require("../utils/logger");
const { sendTaskEmail } = require("../email/sendemail");
const generateEmailTemplate = require("../email/generateTemplate");
const { getEndpoints } = require("../email/tasksEmails");
const domain = process.env.NODE_ENV ? "secure" : "test";
const { SecretsManagerClient, GetSecretValueCommand } = require("@aws-sdk/client-secrets-manager");
const { InstanceRegion } = require("../utils/instanceMgr");
const { InstanceRegion, InstanceEndpoints } = require("../utils/instanceMgr");
const client = new SecretsManagerClient({
region: InstanceRegion()
@@ -372,6 +371,7 @@ exports.postback = async (req, res) => {
iprequest: values,
decodedComment
};
const ipMapping = req.body?.bodyshop?.intellipay_config?.payment_map;
logger.log("intellipay-postback-received", "DEBUG", req.user?.email, null, logResponseMeta);
@@ -418,7 +418,7 @@ exports.postback = async (req, res) => {
amount: p.amount,
transactionid: values.authcode,
payer: "Customer",
type: values.cardtype,
type: ipMapping ? ipMapping[(values.cardtype || "").toLowerCase()] || values.cardtype : values.cardtype,
jobid: p.jobid,
date: moment(Date.now()),
payment_responses: {
@@ -443,31 +443,28 @@ exports.postback = async (req, res) => {
});
if (values.origin === "OneLink" && parsedComment.userEmail) {
try {
const endPoints = getEndpoints();
sendTaskEmail({
to: parsedComment.userEmail,
subject: `New Payment(s) Received - RO ${jobs.jobs.map((j) => j.ro_number).join(", ")}`,
type: "html",
html: generateEmailTemplate({
header: "New Payment(s) Received",
subHeader: "",
body: jobs.jobs
.map(
(job) =>
`Reference: <a href="${endPoints}/manage/jobs/${job.id}">${job.ro_number || "N/A"}</a> | ${job.ownr_co_nm ? job.ownr_co_nm : `${job.ownr_fn || ""} ${job.ownr_ln || ""}`.trim()} | ${`${job.v_model_yr || ""} ${job.v_make_desc || ""} ${job.v_model_desc || ""}`.trim()} | $${partialPayments.find((p) => p.jobid === job.id).amount}`
)
.join("<br/>")
})
});
} catch (error) {
sendTaskEmail({
to: parsedComment.userEmail,
subject: `New Payment(s) Received - RO ${jobs.jobs.map((j) => j.ro_number).join(", ")}`,
type: "html",
html: generateEmailTemplate({
header: "New Payment(s) Received",
subHeader: "",
body: jobs.jobs
.map(
(job) =>
`Reference: <a href="${InstanceEndpoints()}/manage/jobs/${job.id}">${job.ro_number || "N/A"}</a> | ${job.ownr_co_nm ? job.ownr_co_nm : `${job.ownr_fn || ""} ${job.ownr_ln || ""}`.trim()} | ${`${job.v_model_yr || ""} ${job.v_make_desc || ""} ${job.v_model_desc || ""}`.trim()} | $${partialPayments.find((p) => p.jobid === job.id).amount}`
)
.join("<br/>")
})
}).catch((error) => {
logger.log("intellipay-postback-email-error", "ERROR", req.user?.email, null, {
message: error.message,
jobs,
paymentResult,
...logResponseMeta
});
}
});
}
res.sendStatus(200);
} else if (values.invoice) {
@@ -485,7 +482,7 @@ exports.postback = async (req, res) => {
amount: values.total,
transactionid: values.authcode,
payer: "Customer",
type: values.cardtype,
type: ipMapping ? ipMapping[(values.cardtype || "").toLowerCase()] || values.cardtype : values.cardtype,
jobid: values.invoice,
date: moment(Date.now())
}

View File

@@ -1,7 +1,7 @@
const Dinero = require("dinero.js");
const queries = require("../graphql-client/queries");
const adminClient = require("../graphql-client/graphql-client").client;
const _ = require("lodash");
// const adminClient = require("../graphql-client/graphql-client").client;
// const _ = require("lodash");
const logger = require("../utils/logger");
const InstanceMgr = require("../utils/instanceMgr").default;
@@ -45,7 +45,9 @@ exports.totalsSsu = async function (req, res) {
}
});
res.status(200).send();
if (result) {
res.status(200).send();
}
} catch (error) {
logger.log("job-totals-ssu-USA-error", "ERROR", req?.user?.email, id, {
jobid: id,
@@ -59,7 +61,7 @@ exports.totalsSsu = async function (req, res) {
//IMPORTANT*** These two functions MUST be mirrrored.
async function TotalsServerSide(req, res) {
const { job, client } = req.body;
await AutoAddAtsIfRequired({ job: job, client: client });
await AtsAdjustmentsIfRequired({ job: job, client: client, user: req?.user });
try {
let ret = {
@@ -138,10 +140,11 @@ async function Totals(req, res) {
const client = req.userGraphQLClient;
logger.log("job-totals-ssu-USA", "DEBUG", req.user.email, job.id, {
jobid: job.id
jobid: job.id,
id: id
});
await AutoAddAtsIfRequired({ job, client });
await AtsAdjustmentsIfRequired({ job, client, user: req.user });
try {
let ret = {
@@ -153,7 +156,7 @@ async function Totals(req, res) {
res.status(200).json(ret);
} catch (error) {
logger.log("job-totals-USA-error", "ERROR", req.user.email, job.id, {
logger.log("job-totals-ssu-USA-error", "ERROR", req.user.email, job.id, {
jobid: job.id,
error: error.message,
stack: error.stack
@@ -162,40 +165,45 @@ async function Totals(req, res) {
}
}
async function AutoAddAtsIfRequired({ job, client }) {
//Check if ATS should be automatically added.
if (job.auto_add_ats) {
//Get the total sum of hours that should be the ATS amount.
//Check to see if an ATS line exists.
async function AtsAdjustmentsIfRequired({ job, client, user }) {
if (job.auto_add_ats || job.flat_rate_ats) {
let atsAmount = 0;
let atsLineIndex = null;
const atsHours = job.joblines.reduce((acc, val, index) => {
if (val.line_desc && val.line_desc.toLowerCase() === "ats amount") {
atsLineIndex = index;
}
if (
val.mod_lbr_ty !== "LA1" &&
val.mod_lbr_ty !== "LA2" &&
val.mod_lbr_ty !== "LA3" &&
val.mod_lbr_ty !== "LA4" &&
val.mod_lbr_ty !== "LAU" &&
val.mod_lbr_ty !== "LAG" &&
val.mod_lbr_ty !== "LAS" &&
val.mod_lbr_ty !== "LAA"
) {
acc = acc + val.mod_lb_hrs;
}
//Check if ATS should be automatically added.
if (job.auto_add_ats) {
const excludedLaborTypes = new Set(["LAA", "LAG", "LAS", "LAU", "LA1", "LA2", "LA3", "LA4"]);
return acc;
}, 0);
//Get the total sum of hours that should be the ATS amount.
//Check to see if an ATS line exists.
const atsHours = job.joblines.reduce((acc, val, index) => {
if (val.line_desc?.toLowerCase() === "ats amount") {
atsLineIndex = index;
}
const atsAmount = atsHours * (job.rate_ats || 0);
//If it does, update it in place, and make sure it is updated for local calculations.
if (!excludedLaborTypes.has(val.mod_lbr_ty)) {
acc = acc + val.mod_lb_hrs;
}
return acc;
}, 0);
atsAmount = atsHours * (job.rate_ats || 0);
}
//Check if a Flat Rate ATS should be added.
if (job.flat_rate_ats) {
atsLineIndex = ((i) => (i === -1 ? null : i))(
job.joblines.findIndex((line) => line.line_desc?.toLowerCase() === "ats amount")
);
atsAmount = job.rate_ats_flat || 0;
}
//If it does not, create one for local calculations and insert it.
if (atsLineIndex === null) {
const newAtsLine = {
jobid: job.id,
alt_partm: null,
line_no: 35,
unq_seq: 0,
line_ind: "E",
line_desc: "ATS Amount",
@@ -220,19 +228,42 @@ async function AutoAddAtsIfRequired({ job, client }) {
prt_dsmk_m: 0.0
};
const result = await client.request(queries.INSERT_NEW_JOB_LINE, {
lineInput: [newAtsLine]
});
try {
const result = await client.request(queries.INSERT_NEW_JOB_LINE, {
lineInput: [newAtsLine]
});
job.joblines.push(newAtsLine);
if (result) {
job.joblines.push(newAtsLine);
}
} catch (error) {
logger.log("job-totals-ssu-ats-error", "ERROR", user?.email, job.id, {
jobid: job.id,
error: error.message,
stack: error.stack
});
}
}
//If it does not, create one for local calculations and insert it.
//If it does, update it in place, and make sure it is updated for local calculations.
else {
const result = await client.request(queries.UPDATE_JOB_LINE, {
line: { act_price: atsAmount },
lineId: job.joblines[atsLineIndex].id
});
job.joblines[atsLineIndex].act_price = atsAmount;
try {
const result = await client.request(queries.UPDATE_JOB_LINE, {
line: { act_price: atsAmount },
lineId: job.joblines[atsLineIndex].id
});
if (result) {
job.joblines[atsLineIndex].act_price = atsAmount;
}
} catch (error) {
logger.log("job-totals-ssu-ats-error", "ERROR", user?.email, job.id, {
jobid: job.id,
atsLineIndex: atsLineIndex,
atsAmount: atsAmount,
jobline: job.joblines[atsLineIndex],
error: error.message,
stack: error.stack
});
}
}
}
}
@@ -314,7 +345,7 @@ async function CalculateRatesTotals({ job, client }) {
let hasMashLine = false;
let hasMahwLine = false;
let hasCustomMahwLine;
let mapaOpCodes = ParseCalopCode(job.materials["MAPA"]?.cal_opcode);
// let mapaOpCodes = ParseCalopCode(job.materials["MAPA"]?.cal_opcode);
let mashOpCodes = ParseCalopCode(job.materials["MASH"]?.cal_opcode);
jobLines.forEach((item) => {
@@ -564,7 +595,7 @@ function CalculatePartsTotals(jobLines, parts_tax_rates, job) {
}
};
default:
default: {
if (!value.part_type && value.db_ref !== "900510" && value.db_ref !== "900511") return acc;
const discountAmount =
@@ -631,6 +662,7 @@ function CalculatePartsTotals(jobLines, parts_tax_rates, job) {
)
}
};
}
}
},
{
@@ -652,7 +684,7 @@ function CalculatePartsTotals(jobLines, parts_tax_rates, job) {
let adjustments = {};
//Track all adjustments that need to be made.
const linesToAdjustForDiscount = [];
//const linesToAdjustForDiscount = [];
Object.keys(parts_tax_rates).forEach((key) => {
//Check if there's a discount or a mark up.
let disc = Dinero(),
@@ -1019,7 +1051,9 @@ function CalculateTaxesTotals(job, otherTotals) {
}
} catch (error) {
logger.log("job-totals-USA Key with issue", "error", null, job.id, {
key
key: key,
error: error.message,
stack: error.stack
});
}
});
@@ -1157,6 +1191,7 @@ function CalculateTaxesTotals(job, otherTotals) {
exports.default = Totals;
//eslint-disable-next-line no-unused-vars
function DiscountNotAlreadyCounted(jobline, joblines) {
return false;
}
@@ -1172,27 +1207,35 @@ function IsTrueOrYes(value) {
return value === true || value === "Y" || value === "y";
}
async function UpdateJobLines(joblinesToUpdate) {
if (joblinesToUpdate.length === 0) return;
const updateQueries = joblinesToUpdate.map((line, index) =>
generateUpdateQuery(_.pick(line, ["id", "prt_dsmk_m", "prt_dsmk_p"]), index)
);
const query = `
mutation UPDATE_EST_LINES{
${updateQueries}
}
`;
// Function not in use from RO to IO Merger 02/05/2024
// async function UpdateJobLines(joblinesToUpdate) {
// if (joblinesToUpdate.length === 0) return;
// const updateQueries = joblinesToUpdate.map((line, index) =>
// generateUpdateQuery(_.pick(line, ["id", "prt_dsmk_m", "prt_dsmk_p"]), index)
// );
// const query = `
// mutation UPDATE_EST_LINES{
// ${updateQueries}
// }
// `;
// try {
// const result = await adminClient.request(query);
// void result;
// } catch (error) {
// logger.log("update-job-lines", "error", null, null, {
// error: error.message,
// stack: error.stack
// });
// }
// }
const result = await adminClient.request(query);
}
const generateUpdateQuery = (lineToUpdate, index) => {
return `
update_joblines${index}: update_joblines(where: { id: { _eq: "${
lineToUpdate.id
}" } }, _set: ${JSON.stringify(lineToUpdate).replace(/"(\w+)"\s*:/g, "$1:")}) {
returning {
id
}
}`;
};
// const generateUpdateQuery = (lineToUpdate, index) => {
// return `
// update_joblines${index}: update_joblines(where: { id: { _eq: "${
// lineToUpdate.id
// }" } }, _set: ${JSON.stringify(lineToUpdate).replace(/"(\w+)"\s*:/g, "$1:")}) {
// returning {
// id
// }
// }`;
// };

View File

@@ -1,7 +1,5 @@
const Dinero = require("dinero.js");
const queries = require("../graphql-client/queries");
const adminClient = require("../graphql-client/graphql-client").client;
const _ = require("lodash");
const logger = require("../utils/logger");
//****************************************************** */
@@ -44,11 +42,16 @@ exports.totalsSsu = async function (req, res) {
}
});
if (!result) {
throw new Error("Failed to update job totals");
}
res.status(200).send();
} catch (error) {
logger.log("job-totals-ssu-error", "ERROR", req.user.email, id, {
jobid: id,
error
error: error.message,
stack: error.stack
});
res.status(503).send();
}
@@ -57,7 +60,7 @@ exports.totalsSsu = async function (req, res) {
//IMPORTANT*** These two functions MUST be mirrrored.
async function TotalsServerSide(req, res) {
const { job, client } = req.body;
await AutoAddAtsIfRequired({ job: job, client: client });
await AtsAdjustmentsIfRequired({ job: job, client: client, user: req?.user });
try {
let ret = {
@@ -71,7 +74,8 @@ async function TotalsServerSide(req, res) {
} catch (error) {
logger.log("job-totals-ssu-error", "ERROR", req?.user?.email, job.id, {
jobid: job.id,
error
error: error.message,
stack: error.stack
});
res.status(400).send(JSON.stringify(error));
}
@@ -83,13 +87,12 @@ async function Totals(req, res) {
const logger = req.logger;
const client = req.userGraphQLClient;
logger.log("job-totals", "DEBUG", req.user.email, job.id, {
jobid: job.id
logger.log("job-totals-ssu", "DEBUG", req.user.email, job.id, {
jobid: job.id,
id: id
});
logger.log("job-totals-ssu", "DEBUG", req.user.email, id, null);
await AutoAddAtsIfRequired({ job, client });
await AtsAdjustmentsIfRequired({ job, client, user: req.user });
try {
let ret = {
@@ -101,48 +104,54 @@ async function Totals(req, res) {
res.status(200).json(ret);
} catch (error) {
logger.log("job-totals-error", "ERROR", req.user.email, job.id, {
logger.log("job-totals-ssu-error", "ERROR", req.user.email, job.id, {
jobid: job.id,
error
error: error.message,
stack: error.stack
});
res.status(400).send(JSON.stringify(error));
}
}
async function AutoAddAtsIfRequired({ job, client }) {
//Check if ATS should be automatically added.
if (job.auto_add_ats) {
//Get the total sum of hours that should be the ATS amount.
//Check to see if an ATS line exists.
async function AtsAdjustmentsIfRequired({ job, client, user }) {
if (job.auto_add_ats || job.flat_rate_ats) {
let atsAmount = 0;
let atsLineIndex = null;
const atsHours = job.joblines.reduce((acc, val, index) => {
if (val.line_desc && val.line_desc.toLowerCase() === "ats amount") {
atsLineIndex = index;
}
if (
val.mod_lbr_ty !== "LA1" &&
val.mod_lbr_ty !== "LA2" &&
val.mod_lbr_ty !== "LA3" &&
val.mod_lbr_ty !== "LA4" &&
val.mod_lbr_ty !== "LAU" &&
val.mod_lbr_ty !== "LAG" &&
val.mod_lbr_ty !== "LAS" &&
val.mod_lbr_ty !== "LAA"
) {
acc = acc + val.mod_lb_hrs;
}
//Check if ATS should be automatically added.
if (job.auto_add_ats) {
const excludedLaborTypes = new Set(["LAA", "LAG", "LAS", "LAU", "LA1", "LA2", "LA3", "LA4"]);
return acc;
}, 0);
//Get the total sum of hours that should be the ATS amount.
//Check to see if an ATS line exists.
const atsHours = job.joblines.reduce((acc, val, index) => {
if (val.line_desc?.toLowerCase() === "ats amount") {
atsLineIndex = index;
}
const atsAmount = atsHours * (job.rate_ats || 0);
//If it does, update it in place, and make sure it is updated for local calculations.
if (!excludedLaborTypes.has(val.mod_lbr_ty)) {
acc = acc + val.mod_lb_hrs;
}
return acc;
}, 0);
atsAmount = atsHours * (job.rate_ats || 0);
}
//Check if a Flat Rate ATS should be added.
if (job.flat_rate_ats) {
atsLineIndex = ((i) => (i === -1 ? null : i))(
job.joblines.findIndex((line) => line.line_desc?.toLowerCase() === "ats amount")
);
atsAmount = job.rate_ats_flat || 0;
}
//If it does not, create one for local calculations and insert it.
if (atsLineIndex === null) {
const newAtsLine = {
jobid: job.id,
alt_partm: null,
line_no: 35,
unq_seq: 0,
line_ind: "E",
line_desc: "ATS Amount",
@@ -167,22 +176,43 @@ async function AutoAddAtsIfRequired({ job, client }) {
prt_dsmk_m: 0.0
};
const result = await client.request(queries.INSERT_NEW_JOB_LINE, {
lineInput: [newAtsLine]
});
try {
const result = await client.request(queries.INSERT_NEW_JOB_LINE, {
lineInput: [newAtsLine]
});
job.joblines.push(newAtsLine);
if (result) {
job.joblines.push(newAtsLine);
}
} catch (error) {
logger.log("job-totals-ssu-ats-error", "ERROR", user?.email, job.id, {
jobid: job.id,
error: error.message,
stack: error.stack
});
}
}
//If it does not, create one for local calculations and insert it.
//If it does, update it in place, and make sure it is updated for local calculations.
else {
const result = await client.request(queries.UPDATE_JOB_LINE, {
line: { act_price: atsAmount },
lineId: job.joblines[atsLineIndex].id
});
job.joblines[atsLineIndex].act_price = atsAmount;
try {
const result = await client.request(queries.UPDATE_JOB_LINE, {
line: { act_price: atsAmount },
lineId: job.joblines[atsLineIndex].id
});
if (result) {
job.joblines[atsLineIndex].act_price = atsAmount;
}
} catch (error) {
logger.log("job-totals-ssu-ats-error", "ERROR", user?.email, job.id, {
jobid: job.id,
atsLineIndex: atsLineIndex,
atsAmount: atsAmount,
jobline: job.joblines[atsLineIndex],
error: error.message,
stack: error.stack
});
}
}
//console.log(job.jobLines);
}
}

View File

@@ -0,0 +1,199 @@
/**
* @fileoverview Notification event handlers.
* This module exports functions to handle various notification events.
* Each handler optionally calls the scenarioParser and logs errors if they occur,
* then returns a JSON response with a success message.
*/
const scenarioParser = require("./scenarioParser");
/**
* Processes a notification event by invoking the scenario parser.
* The scenarioParser is intentionally not awaited so that the response is sent immediately.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @param {string} parserPath - The key path to be passed to scenarioParser.
* @param {string} successMessage - The message to return on success.
* @returns {Promise<Object>} A promise that resolves to an Express JSON response.
*/
async function processNotificationEvent(req, res, parserPath, successMessage) {
const { logger } = req;
// Call scenarioParser but don't await it; log any error that occurs.
scenarioParser(req, parserPath).catch((error) => {
logger.log("notifications-error", "error", "notifications", null, { message: error?.message, stack: error?.stack });
});
return res.status(200).json({ message: successMessage });
}
/**
* Handle job change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleJobsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.id", "Job Notifications Event Handled.");
/**
* Handle bills change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleBillsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Bills Changed Notification Event Handled.");
/**
* Handle documents change notifications.
* Processes both old and new job IDs if the document was moved between jobs.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleDocumentsChange = async (req, res) => {
const { logger } = req;
const newJobId = req.body?.event?.data?.new?.jobid;
const oldJobId = req.body?.event?.data?.old?.jobid;
// If jobid changed (document moved between jobs), we need to notify both jobs
if (oldJobId && newJobId && oldJobId !== newJobId) {
// Process notification for new job ID
scenarioParser(req, "req.body.event.new.jobid").catch((error) => {
logger.log("notifications-error", "error", "notifications", null, {
message: error?.message,
stack: error?.stack
});
});
// Create a modified request for old job ID
const oldJobReq = {
body: {
...req.body,
event: {
...req.body.event,
data: {
new: {
...req.body.event.data.old,
// Add a flag to indicate this document was moved away
_documentMoved: true,
_movedToJob: newJobId
},
old: null
}
}
},
logger,
sessionUtils: req.sessionUtils
};
// Process notification for old job ID using the modified request
scenarioParser(oldJobReq, "req.body.event.new.jobid").catch((error) => {
logger.log("notifications-error", "error", "notifications", null, {
message: error?.message,
stack: error?.stack
});
});
return res.status(200).json({ message: "Documents Change Notifications Event Handled for both jobs." });
}
// Otherwise just process the new job ID
scenarioParser(req, "req.body.event.new.jobid").catch((error) => {
logger.log("notifications-error", "error", "notifications", null, {
message: error?.message,
stack: error?.stack
});
});
return res.status(200).json({ message: "Documents Change Notifications Event Handled." });
};
/**
* Handle job lines change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleJobLinesChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "JobLines Change Notifications Event Handled.");
/**
* Handle notes change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleNotesChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Notes Changed Notification Event Handled.");
/**
* Handle payments change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handlePaymentsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Payments Changed Notification Event Handled.");
/**
* Handle tasks change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleTasksChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Tasks Notifications Event Handled.");
/**
* Handle time tickets change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Promise<Object>} JSON response with a success message.
*/
const handleTimeTicketsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Time Tickets Changed Notification Event Handled.");
/**
* Handle parts dispatch change notifications.
* Note: Placeholder
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*
*/
const handlePartsDispatchChange = (req, res) => res.status(200).json({ message: "Parts Dispatch change handled." });
/**
* Handle parts order change notifications.
* Note: Placeholder
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsOrderChange = (req, res) => res.status(200).json({ message: "Parts Order change handled." });
module.exports = {
handleJobsChange,
handleBillsChange,
handleDocumentsChange,
handleJobLinesChange,
handleNotesChange,
handlePartsDispatchChange,
handlePartsOrderChange,
handlePaymentsChange,
handleTasksChange,
handleTimeTicketsChange
};

View File

@@ -1,5 +0,0 @@
const handleJobsChange = (req, res) => {
return res.status(200).json({ message: "Jobs change handled." });
};
module.exports = handleJobsChange;

View File

@@ -1,5 +0,0 @@
const handleBillsChange = (req, res) => {
return res.status(200).json({ message: "Bills change handled." });
};
module.exports = handleBillsChange;

View File

@@ -1,5 +0,0 @@
const handlePartsDispatchChange = (req, res) => {
return res.status(200).json({ message: "Parts Dispatch change handled." });
};
module.exports = handlePartsDispatchChange;

View File

@@ -1,5 +0,0 @@
const handlePartsOrderChange = (req, res) => {
return res.status(200).json({ message: "Parts Order change handled." });
};
module.exports = handlePartsOrderChange;

View File

@@ -1,5 +0,0 @@
const handleTasksChange = (req, res) => {
return res.status(200).json({ message: "Tasks change handled." });
};
module.exports = handleTasksChange;

View File

@@ -1,5 +0,0 @@
const handleTimeTicketsChange = (req, res) => {
return res.status(200).json({ message: "Time Tickets change handled." });
};
module.exports = handleTimeTicketsChange;

View File

@@ -0,0 +1,74 @@
/**
* Parses an event by comparing old and new data to determine which fields have changed.
*
* This function analyzes the differences between previous (`oldData`) and current (`newData`)
* data states to identify changed fields. It determines if the event is a new entry or an update
* and returns details about changed fields, the event type, and associated metadata.
*
* @param {Object} options - Configuration options for parsing the event.
* @param {Object} [options.oldData] - The previous state of the data (undefined for new entries).
* @param {Object} options.newData - The current state of the data.
* @param {string} options.trigger - The type of event trigger (e.g., 'INSERT', 'UPDATE').
* @param {string} options.table - The name of the table associated with the event.
* @param {string} [options.jobId] - The job ID, if already extracted by the caller (optional).
* @returns {Object} An object containing the parsed event details:
* - {Array<string>} changedFieldNames - List of field names that have changed.
* - {Object} changedFields - Map of changed fields with their old and new values.
* - {boolean} isNew - True if the event is a new entry (no oldData provided).
* - {Object} data - The current data state (`newData`).
* - {string} trigger - The event trigger type.
* - {string} table - The table name.
* - {string|null} jobId - The provided jobId or null if not provided.
*/
const eventParser = async ({ oldData, newData, trigger, table, jobId = null }) => {
const isNew = !oldData; // True if no old data exists, indicating a new entry
let changedFields = {};
let changedFieldNames = [];
if (isNew) {
// For new entries, all fields in newData are considered "changed" (from undefined to their value)
changedFields = Object.fromEntries(
Object.entries(newData).map(([key, value]) => [key, { old: undefined, new: value }])
);
changedFieldNames = Object.keys(newData); // All keys are new
} else {
// Compare oldData and newData to detect updates
for (const key in newData) {
if (Object.prototype.hasOwnProperty.call(newData, key)) {
// Check if the field is new or its value has changed
if (
!Object.prototype.hasOwnProperty.call(oldData, key) || // Field didnt exist before
JSON.stringify(oldData[key]) !== JSON.stringify(newData[key]) // Values differ (deep comparison)
) {
changedFields[key] = {
old: oldData[key], // Undefined if field wasnt in oldData
new: newData[key]
};
changedFieldNames.push(key);
}
}
}
// Identify fields removed in newData (present in oldData but absent in newData)
for (const key in oldData) {
if (Object.prototype.hasOwnProperty.call(oldData, key) && !Object.prototype.hasOwnProperty.call(newData, key)) {
changedFields[key] = {
old: oldData[key],
new: null // Mark as removed
};
changedFieldNames.push(key);
}
}
}
return {
changedFieldNames, // Array of fields that changed
changedFields, // Object with old/new values for changed fields
isNew, // Boolean indicating if this is a new entry
data: newData, // Current data state
trigger, // Event trigger (e.g., 'INSERT', 'UPDATE')
table, // Associated table name
jobId // Provided jobId or null
};
};
module.exports = eventParser;

View File

@@ -0,0 +1,297 @@
const { Queue, Worker } = require("bullmq");
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const graphQLClient = require("../../graphql-client/graphql-client").client;
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
const APP_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1
})();
// Base time-related constant (in milliseconds) / DO NOT TOUCH
const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH
const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s)
const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base)
const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base)
let addQueue;
let consolidateQueue;
/**
* Builds the scenario_text, fcm_text, and scenario_meta for a batch of notifications.
*
* @param {Array<Object>} notifications - Array of notification objects with 'body' and 'variables'.
* @returns {Object} An object with 'scenario_text', 'fcm_text', and 'scenario_meta'.
*/
const buildNotificationContent = (notifications) => {
const scenarioText = notifications.map((n) => n.body); // Array of text entries
const fcmText = scenarioText.join(". "); // Concatenated text with period separator
const scenarioMeta = notifications.map((n) => n.variables || {}); // Array of metadata objects
return {
scenario_text: scenarioText,
fcm_text: fcmText ? `${fcmText}.` : null, // Add trailing period if non-empty, otherwise null
scenario_meta: scenarioMeta
};
};
/**
* Initializes the notification queues and workers for adding and consolidating notifications.
*/
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`);
addQueue = new Queue("notificationsAdd", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
consolidateQueue = new Queue("notificationsConsolidate", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
const addWorker = new Worker(
"notificationsAdd",
async (job) => {
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
devDebugLogger(`Adding notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
for (const recipient of recipients) {
const { user } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
const existingNotifications = await pubClient.get(userKey);
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
notifications.push(notification);
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000);
devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
const consolidateKey = `app:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
if (flagSet) {
await consolidateQueue.add(
"consolidate-notifications",
{ jobId, recipients },
{
jobId: `consolidate:${jobId}`,
delay: APP_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
} else {
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
}
},
{
prefix,
connection: pubClient,
concurrency: 5
}
);
const consolidateWorker = new Worker(
"notificationsConsolidate",
async (job) => {
const { jobId, recipients } = job.data;
devDebugLogger(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const lockKey = `lock:${devKey}:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
if (lockAcquired) {
try {
const allNotifications = {};
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
for (const user of uniqueUsers) {
const userKey = `${redisKeyPrefix}:${user}`;
const notifications = await pubClient.get(userKey);
devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
const userRecipients = recipients.filter((r) => r.user === user);
for (const { bodyShopId } of userRecipients) {
allNotifications[user] = allNotifications[user] || {};
allNotifications[user][bodyShopId] = parsedNotifications;
}
await pubClient.del(userKey);
devDebugLogger(`Deleted Redis key ${userKey}`);
} else {
devDebugLogger(`No notifications found for ${user} under ${userKey}`);
}
}
devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
// Insert notifications into the database and collect IDs
const notificationInserts = [];
const notificationIdMap = new Map();
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userRecipients = recipients.filter((r) => r.user === user);
const associationId = userRecipients[0]?.associationId;
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
const { scenario_text, fcm_text, scenario_meta } = buildNotificationContent(notifications);
notificationInserts.push({
jobid: jobId,
associationid: associationId,
scenario_text: JSON.stringify(scenario_text),
fcm_text: fcm_text,
scenario_meta: JSON.stringify(scenario_meta)
});
notificationIdMap.set(`${user}:${bodyShopId}`, null);
}
}
if (notificationInserts.length > 0) {
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
objects: notificationInserts
});
devDebugLogger(
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
);
insertResponse.insert_notifications.returning.forEach((row, index) => {
const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)];
const bodyShopId = Object.keys(allNotifications[user])[
index % Object.keys(allNotifications[user]).length
];
notificationIdMap.set(`${user}:${bodyShopId}`, row.id);
});
}
// Emit notifications to users via Socket.io with notification ID
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userMapping = await redisHelpers.getUserSocketMapping(user);
const userRecipients = recipients.filter((r) => r.user === user);
const associationId = userRecipients[0]?.associationId;
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`);
const jobRoNumber = notifications[0]?.jobRoNumber;
if (userMapping && userMapping[bodyShopId]?.socketIds) {
userMapping[bodyShopId].socketIds.forEach((socketId) => {
ioRedis.to(socketId).emit("notification", {
jobId,
jobRoNumber,
bodyShopId,
notifications,
notificationId,
associationId
});
});
devDebugLogger(
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
);
} else {
devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
}
}
await pubClient.del(`app:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
});
throw err;
} finally {
await pubClient.del(lockKey);
}
} else {
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
prefix,
connection: pubClient,
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`));
addWorker.on("failed", (job, err) =>
logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
consolidateWorker.on("failed", (job, err) =>
logger.log(`app-queue-consolidation-failed:`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
devDebugLogger("Closing app queue workers...");
await Promise.all([addWorker.close(), consolidateWorker.close()]);
devDebugLogger("App queue workers closed");
};
registerCleanupTask(shutdown);
}
return addQueue;
};
/**
* Retrieves the initialized `addQueue` instance.
*/
const getQueue = () => {
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
return addQueue;
};
/**
* Dispatches notifications to the `addQueue` for processing.
*/
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
const appQueue = getQueue();
for (const app of appsToDispatch) {
const { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber } = app;
await appQueue.add(
"add-notification",
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
{ jobId: `${jobId}:${Date.now()}` }
);
devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };

View File

@@ -0,0 +1,252 @@
const { Queue, Worker } = require("bullmq");
const { sendTaskEmail } = require("../../email/sendemail");
const generateEmailTemplate = require("../../email/generateTemplate");
const { InstanceEndpoints } = require("../../utils/instanceMgr");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const moment = require("moment-timezone");
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS;
const parsedValue = envValue ? parseInt(envValue, 10) : NaN;
return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1
})();
// Base time-related constant (in milliseconds) / DO NOT TOUCH
const EMAIL_CONSOLIDATION_DELAY = EMAIL_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout)
// Derived time-related constants based on EMAIL_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to EMAIL_CONSOLIDATION_DELAY
const CONSOLIDATION_KEY_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation)
const LOCK_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration)
const RATE_LIMITER_DURATION = EMAIL_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting)
const NOTIFICATION_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (matches consolidation key expiration)
let emailAddQueue;
let emailConsolidateQueue;
let emailAddWorker;
let emailConsolidateWorker;
/**
* Initializes the email notification queues and workers.
*
* @param {Object} options - Configuration options for queue initialization.
* @param {Object} options.pubClient - Redis client instance for queue communication.
* @param {Object} options.logger - Logger instance for logging events and debugging.
* @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications.
*/
const loadEmailQueue = async ({ pubClient, logger }) => {
if (!emailAddQueue || !emailConsolidateQueue) {
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
devDebugLogger(`Initializing Email Notification Queues with prefix: ${prefix}`);
// Queue for adding email notifications
emailAddQueue = new Queue("emailAdd", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
// Queue for consolidating and sending emails
emailConsolidateQueue = new Queue("emailConsolidate", {
prefix,
connection: pubClient,
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
// Worker to process adding notifications
emailAddWorker = new Worker(
"emailAdd",
async (job) => {
const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = job.data;
devDebugLogger(`Adding email notifications for jobId ${jobId}`);
const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`;
for (const recipient of recipients) {
const { user, firstName, lastName } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
await pubClient.rpush(userKey, body);
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`;
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone);
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user);
devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`);
}
const consolidateKey = `email:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
if (flagSet) {
await emailConsolidateQueue.add(
"consolidate-emails",
{ jobId, jobRoNumber, bodyShopName, bodyShopTimezone },
{
jobId: `consolidate:${jobId}`,
delay: EMAIL_CONSOLIDATION_DELAY,
attempts: 3,
backoff: LOCK_EXPIRATION
}
);
devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
} else {
devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`);
}
},
{
prefix,
connection: pubClient,
concurrency: 5
}
);
// Worker to consolidate and send emails
emailConsolidateWorker = new Worker(
"emailConsolidate",
async (job) => {
const { jobId, jobRoNumber, bodyShopName } = job.data;
devDebugLogger(`Consolidating emails for jobId ${jobId}`);
const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
if (lockAcquired) {
try {
const recipientsSet = `email:${devKey}:recipients:${jobId}`;
const recipients = await pubClient.smembers(recipientsSet);
for (const recipient of recipients) {
const userKey = `email:${devKey}:notifications:${jobId}:${recipient}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${recipient}`;
const messages = await pubClient.lrange(userKey, 0, -1);
if (messages.length > 0) {
const details = await pubClient.hgetall(detailsKey);
const firstName = details.firstName || "User";
const multipleUpdateString = messages.length > 1 ? "Updates" : "Update";
const subject = `${multipleUpdateString} for job ${jobRoNumber || "N/A"} at ${bodyShopName}`;
const timezone = moment.tz.zone(details?.bodyShopTimezone) ? details.bodyShopTimezone : "UTC";
const emailBody = generateEmailTemplate({
header: `${multipleUpdateString} for Job ${jobRoNumber || "N/A"}`,
subHeader: `Dear ${firstName},`,
dateLine: moment().tz(timezone).format("MM/DD/YYYY hh:mm a"),
body: `
<p>There have been updates to job ${jobRoNumber || "N/A"} at ${bodyShopName}:</p><br/>
<ul>
${messages.map((msg) => `<li>${msg}</li>`).join("")}
</ul><br/><br/>
<p><a href="${InstanceEndpoints()}/manage/jobs/${jobId}">Please check the job for more details.</a></p>
`
});
await sendTaskEmail({
to: recipient,
subject,
type: "html",
html: emailBody
});
devDebugLogger(
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
);
await pubClient.del(userKey);
await pubClient.del(detailsKey);
}
}
await pubClient.del(recipientsSet);
await pubClient.del(`email:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
});
throw err;
} finally {
await pubClient.del(lockKey);
}
} else {
devDebugLogger(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
prefix,
connection: pubClient,
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
// Event handlers for workers
emailAddWorker.on("completed", (job) => devDebugLogger(`Email add job ${job.id} completed`));
emailConsolidateWorker.on("completed", (job) => devDebugLogger(`Email consolidate job ${job.id} completed`));
emailAddWorker.on("failed", (job, err) =>
logger.log(`add-email-queue-failed`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
emailConsolidateWorker.on("failed", (job, err) =>
logger.log(`email-consolidation-job-failed`, "ERROR", "notifications", "api", {
message: err?.message,
stack: err?.stack
})
);
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
devDebugLogger("Closing email queue workers...");
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
devDebugLogger("Email queue workers closed");
};
registerCleanupTask(shutdown);
}
return emailAddQueue;
};
/**
* Retrieves the initialized `emailAddQueue` instance.
*
* @returns {Queue} The `emailAddQueue` instance for adding notifications.
* @throws {Error} If `emailAddQueue` is not initialized.
*/
const getQueue = () => {
if (!emailAddQueue) {
throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap.");
}
return emailAddQueue;
};
/**
* Dispatches email notifications to the `emailAddQueue` for processing.
*
* @param {Object} options - Options for dispatching notifications.
* @param {Array} options.emailsToDispatch - Array of email notification objects.
* @param {Object} options.logger - Logger instance for logging dispatch events.
* @returns {Promise<void>} Resolves when all notifications are added to the queue.
*/
const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
const emailAddQueue = getQueue();
for (const email of emailsToDispatch) {
const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = email;
if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) {
devDebugLogger(
`Skipping email dispatch for jobId ${jobId} due to missing data: ` +
`jobRoNumber=${jobRoNumber || "N/A"}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}`
);
continue;
}
await emailAddQueue.add(
"add-email-notification",
{ jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients },
{ jobId: `${jobId}:${Date.now()}` }
);
devDebugLogger(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadEmailQueue, getQueue, dispatchEmailsToQueue };

View File

@@ -0,0 +1,414 @@
const { getJobAssignmentType, formatTaskPriority } = require("./stringHelpers");
const moment = require("moment-timezone");
const { startCase } = require("lodash");
const Dinero = require("dinero.js");
Dinero.globalRoundingMode = "HALF_EVEN";
/**
* Creates a standard notification object with app, email, and FCM properties and populates recipients.
* @param {Object} data - Input data containing jobId, jobRoNumber, bodyShopId, bodyShopName, and scenarioWatchers
* @param {string} key - Notification key for the app
* @param {string} body - Notification body text
* @param {Object} [variables={}] - Variables for the app notification
* @returns {Object} Notification object with populated recipients
*/
const buildNotification = (data, key, body, variables = {}) => {
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key,
body,
variables,
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
bodyShopTimezone: data.bodyShopTimezone,
body,
recipients: []
},
fcm: { recipients: [] }
};
// Populate recipients from scenarioWatchers
data.scenarioWatchers.forEach((recipients) => {
const { user, app, fcm, email, firstName, lastName, employeeId, associationId } = recipients;
if (app === true)
result.app.recipients.push({
user,
bodyShopId: data.bodyShopId,
employeeId,
associationId
});
if (fcm === true) result.fcm.recipients.push(user);
if (email === true) result.email.recipients.push({ user, firstName, lastName });
});
return result;
};
/**
* Creates a notification for when the alternate transport is changed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const alternateTransportChangedBuilder = (data) => {
const oldTransport = data?.changedFields?.alt_transport?.old;
const newTransport = data?.changedFields?.alt_transport?.new;
let body;
if (oldTransport && newTransport)
body = `The alternate transportation has been changed from ${oldTransport} to ${newTransport}.`;
else if (!oldTransport && newTransport) body = `The alternate transportation has been set to ${newTransport}.`;
else if (oldTransport && !newTransport)
body = `The alternate transportation has been canceled (previously ${oldTransport}).`;
else body = `The alternate transportation has been updated.`;
return buildNotification(data, "notifications.job.alternateTransportChanged", body, {
alternateTransport: newTransport,
oldAlternateTransport: oldTransport
});
};
/**
* Creates a notification for when a bill is posted.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const billPostedBuilder = (data) => {
const facing = data?.data?.isinhouse ? "in-house" : "vendor";
const body = `An ${facing} ${data?.data?.is_credit_memo ? "credit memo" : "bill"} has been posted.`.trim();
return buildNotification(data, "notifications.job.billPosted", body, {
isInHouse: data?.data?.isinhouse,
isCreditMemo: data?.data?.is_credit_memo
});
};
/**
* Creates a notification for when the status of critical parts changes.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const criticalPartsStatusChangedBuilder = (data) => {
const lineDesc = data?.data?.line_desc;
const status = data?.data?.status;
const body = status
? `The status on a critical part line (${lineDesc}) has been set to ${status}.`
: `The status on a critical part line (${lineDesc}) has been cleared.`;
return buildNotification(data, "notifications.job.criticalPartsStatusChanged", body, {
joblineId: data?.data?.id,
status: data?.data?.status,
line_desc: lineDesc
});
};
/**
* Creates a notification for when the intake or delivery checklist is completed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const intakeDeliveryChecklistCompletedBuilder = (data) => {
const checklistType = data?.changedFields?.intakechecklist ? "intake" : "delivery";
const body = `The ${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist has been completed.`;
return buildNotification(data, "notifications.job.checklistCompleted", body, {
checklistType,
completed: true
});
};
/**
* Creates a notification for when a job is assigned to the user.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const jobAssignedToMeBuilder = (data) => {
const body = `You have been assigned to ${getJobAssignmentType(data.scenarioFields?.[0])}.`;
return buildNotification(data, "notifications.job.assigned", body, {
type: data.scenarioFields?.[0]
});
};
/**
* Creates a notification for when jobs are added to production.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const jobsAddedToProductionBuilder = (data) => {
const body = `Job is now in production.`;
return buildNotification(data, "notifications.job.addedToProduction", body);
};
/**
* Creates a notification for when the job status changes.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const jobStatusChangeBuilder = (data) => {
const oldStatus = data?.changedFields?.status?.old;
const newStatus = data?.changedFields?.status?.new;
let body;
if (oldStatus && newStatus) body = `The status has been changed from ${oldStatus} to ${newStatus}.`;
else if (!oldStatus && newStatus) body = `The status has been set to ${newStatus}.`;
else if (oldStatus && !newStatus) body = `The status has been cleared (previously ${oldStatus}).`;
else body = `The status has been updated.`;
return buildNotification(data, "notifications.job.statusChanged", body, {
status: newStatus,
oldStatus: oldStatus
});
};
/**
* Creates a notification for when new media is added or reassigned.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const newMediaAddedReassignedBuilder = (data) => {
const mediaType = data?.data?.type?.startsWith("image") ? "Image" : "Document";
const action = data?.data?._documentMoved
? "moved to another job"
: data.isNew
? "added"
: data.changedFields?.jobid && data.changedFields.jobid.old !== data.changedFields.jobid.new
? "moved to this job"
: "updated";
const body = `An ${mediaType} has been ${action}.`;
return buildNotification(data, "notifications.job.newMediaAdded", body, {
mediaType,
action,
movedToJob: data?.data?._movedToJob
});
};
/**
* Creates a notification for when a new note is added.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const newNoteAddedBuilder = (data) => {
const body = [
"A",
data?.data?.critical && "critical",
data?.data?.private && "private",
data?.data?.type,
"note has been added by",
`${data.data.created_by}`
]
.filter(Boolean)
.join(" ");
return buildNotification(data, "notifications.job.newNoteAdded", body, {
createdBy: data?.data?.created_by,
critical: data?.data?.critical,
type: data?.data?.type,
private: data?.data?.private
});
};
/**
* Creates a notification for when a new time ticket is posted.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const newTimeTicketPostedBuilder = (data) => {
const type = data?.data?.cost_center;
const body = `A ${startCase(type.toLowerCase())} time ticket for ${data?.data?.date} has been posted.`.trim();
return buildNotification(data, "notifications.job.newTimeTicketPosted", body, {
type,
date: data?.data?.date
});
};
/**
* Creates a notification for when a part is marked as back-ordered.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const partMarkedBackOrderedBuilder = (data) => {
const body = `A part ${data?.data?.line_desc} has been marked as back-ordered.`;
return buildNotification(data, "notifications.job.partBackOrdered", body, {
line_desc: data?.data?.line_desc
});
};
/**
* Creates a notification for when payment is collected or completed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const paymentCollectedCompletedBuilder = (data) => {
const momentFormat = "MM/DD/YYYY";
const amountDinero = Dinero({ amount: Math.round((data.data.amount || 0) * 100) });
const amountFormatted = amountDinero.toFormat();
const payer = data.data.payer;
const paymentType = data.data.type;
const paymentDate = moment(data.data.date).format(momentFormat);
const body = `Payment of ${amountFormatted} has been collected from ${payer} via ${paymentType} on ${paymentDate}`;
return buildNotification(data, "notifications.job.paymentCollected", body, {
amount: data.data.amount,
payer: data.data.payer,
type: data.data.type,
date: data.data.date
});
};
/**
* Creates a notification for when scheduled dates are changed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const scheduledDatesChangedBuilder = (data) => {
const changedFields = data.changedFields;
const fieldConfigs = {
scheduled_in: "Scheduled In",
scheduled_completion: "Scheduled Completion",
scheduled_delivery: "Scheduled Delivery"
};
const formatDateTime = (date) => {
if (!date) return "(no date set)";
const formatted = moment(date).tz(data.bodyShopTimezone);
return `${formatted.format("MM/DD/YYYY")} at ${formatted.format("hh:mm a")}`;
};
const fieldMessages = Object.entries(fieldConfigs)
.filter(([field]) => changedFields[field])
.map(([field, label]) => {
const { old, new: newValue } = changedFields[field];
if (old && !newValue) return `${label} was cancelled (previously ${formatDateTime(old)}).`;
else if (!old && newValue) return `${label} was set to ${formatDateTime(newValue)}.`;
else if (old && newValue) return `${label} changed from ${formatDateTime(old)} to ${formatDateTime(newValue)}.`;
return "";
})
.filter(Boolean);
const body = fieldMessages.length > 0 ? fieldMessages.join(" ") : "Scheduled dates have been updated.";
return buildNotification(data, "notifications.job.scheduledDatesChanged", body, {
scheduledIn: changedFields.scheduled_in?.new,
oldScheduledIn: changedFields.scheduled_in?.old,
scheduledCompletion: changedFields.scheduled_completion?.new,
oldScheduledCompletion: changedFields.scheduled_completion?.old,
scheduledDelivery: changedFields.scheduled_delivery?.new,
oldScheduledDelivery: changedFields.scheduled_delivery?.old
});
};
/**
* Creates a notification for when tasks are updated or created.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const tasksUpdatedCreatedBuilder = (data) => {
const momentFormat = "MM/DD/YYYY hh:mm a";
const timezone = data.bodyShopTimezone;
const taskTitle = data?.data?.title ? `"${data.data.title}"` : "Unnamed Task";
let body, variables;
if (data.isNew) {
const priority = formatTaskPriority(data?.data?.priority);
const createdBy = data?.data?.created_by || "Unknown";
const dueDate = data.data.due_date ? ` due on ${moment(data.data.due_date).tz(timezone).format(momentFormat)}` : "";
const completedOnCreation = data.data.completed === true;
body = `A ${priority} task ${taskTitle} has been created${completedOnCreation ? " and marked completed" : ""} by ${createdBy}${dueDate}.`;
variables = {
isNew: data.isNew,
roNumber: data.jobRoNumber,
title: data?.data?.title,
priority: data?.data?.priority,
createdBy: data?.data?.created_by,
dueDate: data?.data?.due_date,
completed: completedOnCreation ? data?.data?.completed : undefined
};
} else {
const changedFields = data.changedFields;
const fieldNames = Object.keys(changedFields);
const oldTitle = changedFields.title ? `"${changedFields.title.old || "Unnamed Task"}"` : taskTitle;
if (fieldNames.length === 1 && changedFields.completed) {
body = `Task ${oldTitle} was marked ${changedFields.completed.new ? "complete" : "incomplete"}`;
variables = {
isNew: data.isNew,
roNumber: data.jobRoNumber,
title: data?.data?.title,
changedCompleted: changedFields.completed.new
};
} else {
const fieldMessages = [];
if (changedFields.title)
fieldMessages.push(`Task ${oldTitle} changed title to "${changedFields.title.new || "unnamed task"}".`);
if (changedFields.description) fieldMessages.push("Description updated.");
if (changedFields.priority)
fieldMessages.push(`Priority changed to ${formatTaskPriority(changedFields.priority.new)}.`);
if (changedFields.due_date)
fieldMessages.push(`Due date set to ${moment(changedFields.due_date.new).tz(timezone).format(momentFormat)}.`);
if (changedFields.completed)
fieldMessages.push(`Status changed to ${changedFields.completed.new ? "complete" : "incomplete"}.`);
body =
fieldMessages.length > 0
? fieldMessages.length === 1 && changedFields.title
? fieldMessages[0]
: `Task ${oldTitle} updated: ${fieldMessages.join(", ")}`
: `Task ${oldTitle} has been updated.`;
variables = {
isNew: data.isNew,
roNumber: data.jobRoNumber,
title: data?.data?.title,
changedTitleOld: changedFields.title?.old,
changedTitleNew: changedFields.title?.new,
changedPriority: changedFields.priority?.new,
changedDueDate: changedFields.due_date?.new,
changedCompleted: changedFields.completed?.new
};
}
}
return buildNotification(
data,
data.isNew ? "notifications.job.taskCreated" : "notifications.job.taskUpdated",
body,
variables
);
};
/**
* Creates a notification for when a supplement is imported.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const supplementImportedBuilder = (data) => {
const body = `A supplement has been imported.`;
return buildNotification(data, "notifications.job.supplementImported", body);
};
module.exports = {
alternateTransportChangedBuilder,
billPostedBuilder,
criticalPartsStatusChangedBuilder,
intakeDeliveryChecklistCompletedBuilder,
jobAssignedToMeBuilder,
jobsAddedToProductionBuilder,
jobStatusChangeBuilder,
newMediaAddedReassignedBuilder,
newNoteAddedBuilder,
newTimeTicketPostedBuilder,
partMarkedBackOrderedBuilder,
paymentCollectedCompletedBuilder,
scheduledDatesChangedBuilder,
supplementImportedBuilder,
tasksUpdatedCreatedBuilder
};

View File

@@ -0,0 +1,257 @@
const {
jobAssignedToMeBuilder,
billPostedBuilder,
newNoteAddedBuilder,
scheduledDatesChangedBuilder,
tasksUpdatedCreatedBuilder,
jobStatusChangeBuilder,
jobsAddedToProductionBuilder,
alternateTransportChangedBuilder,
newTimeTicketPostedBuilder,
intakeDeliveryChecklistCompletedBuilder,
paymentCollectedCompletedBuilder,
newMediaAddedReassignedBuilder,
criticalPartsStatusChangedBuilder,
supplementImportedBuilder,
partMarkedBackOrderedBuilder
} = require("./scenarioBuilders");
const logger = require("../utils/logger");
const { isFunction } = require("lodash");
/**
* An array of notification scenario definitions.
*
* Each scenario object can include the following properties:
* - key {string}: The unique scenario name.
* - table {string}: The table name to check for changes.
* - fields {Array<string>}: Fields to check for changes.
* - matchToUserFields {Array<string>}: Fields used to match scenarios to user data.
* - onNew {boolean|Array<boolean>}: Indicates whether the scenario should be triggered on new data.
* - builder {Function}: A function to handle the scenario.
* - onlyTruthyValues {boolean|Array<string>}: Specifies fields that must have truthy values for the scenario to match.
* - filterCallback {Function}: Optional callback (sync or async) to further filter the scenario based on event data (returns boolean).
* - enabled {boolean}: If true, the scenario is active; if false or omitted, the scenario is skipped.
*/
const notificationScenarios = [
{
key: "job-assigned-to-me",
enabled: true,
table: "jobs",
fields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
matchToUserFields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
builder: jobAssignedToMeBuilder
},
{
key: "bill-posted",
enabled: true,
table: "bills",
builder: billPostedBuilder,
onNew: true
},
{
key: "new-note-added",
enabled: true,
table: "notes",
builder: newNoteAddedBuilder,
onNew: true
},
{
key: "schedule-dates-changed",
enabled: true,
table: "jobs",
fields: ["scheduled_in", "scheduled_completion", "scheduled_delivery"],
builder: scheduledDatesChangedBuilder
},
{
key: "tasks-updated-created",
enabled: true,
table: "tasks",
fields: ["updated_at"],
// onNew: true,
builder: tasksUpdatedCreatedBuilder
},
{
key: "job-status-change",
enabled: true,
table: "jobs",
fields: ["status"],
builder: jobStatusChangeBuilder
},
{
key: "job-added-to-production",
enabled: true,
table: "jobs",
fields: ["inproduction"],
onlyTruthyValues: ["inproduction"],
builder: jobsAddedToProductionBuilder
},
{
key: "alternate-transport-changed",
enabled: true,
table: "jobs",
fields: ["alt_transport"],
builder: alternateTransportChangedBuilder
},
{
key: "new-time-ticket-posted",
enabled: true,
table: "timetickets",
builder: newTimeTicketPostedBuilder
},
{
key: "intake-delivery-checklist-completed",
enabled: true,
table: "jobs",
fields: ["intakechecklist", "deliverchecklist"],
builder: intakeDeliveryChecklistCompletedBuilder
},
{
key: "payment-collected-completed",
enabled: true,
table: "payments",
onNew: true,
builder: paymentCollectedCompletedBuilder
},
{
// Only works on a non LMS ENV
key: "new-media-added-reassigned",
enabled: true,
table: "documents",
fields: ["jobid"],
builder: newMediaAddedReassignedBuilder
},
{
key: "critical-parts-status-changed",
enabled: true,
table: "joblines",
fields: ["status"],
onlyTruthyValues: ["status"],
builder: criticalPartsStatusChangedBuilder,
filterCallback: ({ eventData }) => !eventData?.data?.critical
},
{
key: "part-marked-back-ordered",
enabled: true,
table: "joblines",
fields: ["status"],
builder: partMarkedBackOrderedBuilder,
filterCallback: async ({ eventData, getBodyshopFromRedis }) => {
try {
const bodyshop = await getBodyshopFromRedis(eventData.bodyShopId);
return eventData?.data?.status !== bodyshop?.md_order_statuses?.default_bo;
} catch (err) {
logger.log("notifications-error-parts-marked-back-ordered", "error", "notifications", "mapper", {
message: err?.message,
stack: err?.stack
});
return false;
}
}
},
// Holding off on this one for now, spans multiple tables
{
key: "supplement-imported",
enabled: false,
builder: supplementImportedBuilder
}
];
/**
* Returns an array of scenarios that match the given event data.
* Supports asynchronous callbacks for additional filtering.
*
* @param {Object} eventData - The parsed event data.
* Expected properties:
* - table: an object with a `name` property (e.g. { name: "tasks", schema: "public" })
* - changedFieldNames: an array of changed field names (e.g. [ "description", "updated_at" ])
* - isNew: boolean indicating whether the record is new or updated
* - data: the new data object (used to check field values)
* - (other properties may be added such as jobWatchers, bodyShopId, etc.)
* @param {Function} getBodyshopFromRedis - Function to retrieve bodyshop data from Redis.
* @returns {Promise<Array<Object>>} A promise resolving to an array of matching scenario objects.
*/
const getMatchingScenarios = async (eventData, getBodyshopFromRedis) => {
const matches = [];
for (const scenario of notificationScenarios) {
// Check if the scenario is enabled; skip if not explicitly true
if (scenario.enabled !== true) {
continue;
}
// If eventData has a table, then only scenarios with a table property that matches should be considered.
if (eventData.table) {
if (!scenario.table || eventData.table.name !== scenario.table) {
continue;
}
}
// Check the onNew flag.
// Allow onNew to be either a boolean or an array of booleans.
if (Object.prototype.hasOwnProperty.call(scenario, "onNew")) {
if (Array.isArray(scenario.onNew)) {
if (!scenario.onNew.includes(eventData.isNew)) continue;
} else {
if (eventData.isNew !== scenario.onNew) continue;
}
}
// If the scenario defines fields, ensure at least one of them is present in changedFieldNames.
if (scenario.fields && scenario.fields.length > 0) {
const hasMatchingField = scenario.fields.some((field) => eventData.changedFieldNames.includes(field));
if (!hasMatchingField) {
continue;
}
}
// OnlyTruthyValues logic:
// If onlyTruthyValues is defined, check that the new values of specified fields (or all changed fields if true)
// are truthy. If an array, only check the listed fields, which must be in scenario.fields.
if (Object.prototype.hasOwnProperty.call(scenario, "onlyTruthyValues")) {
let fieldsToCheck;
if (scenario.onlyTruthyValues === true) {
// If true, check all fields in the scenario that changed
fieldsToCheck = scenario.fields.filter((field) => eventData.changedFieldNames.includes(field));
} else if (Array.isArray(scenario.onlyTruthyValues) && scenario.onlyTruthyValues.length > 0) {
// If an array, check only the specified fields, ensuring they are in scenario.fields
fieldsToCheck = scenario.onlyTruthyValues.filter(
(field) => scenario.fields.includes(field) && eventData.changedFieldNames.includes(field)
);
// If no fields in onlyTruthyValues match the scenarios fields or changed fields, skip this scenario
if (fieldsToCheck.length === 0) {
continue;
}
} else {
// Invalid onlyTruthyValues (not true or a non-empty array), skip this scenario
continue;
}
// Ensure all fields to check have truthy new values
const allTruthy = fieldsToCheck.every((field) => Boolean(eventData.data[field]));
if (!allTruthy) {
continue;
}
}
// Execute the callback if defined, supporting both sync and async, and filter based on its return value
if (isFunction(scenario?.filterCallback)) {
const shouldFilter = await Promise.resolve(
scenario.filterCallback({
eventData,
getBodyshopFromRedis
})
);
if (shouldFilter) {
continue;
}
}
matches.push(scenario);
}
return matches;
};
module.exports = {
notificationScenarios,
getMatchingScenarios
};

View File

@@ -0,0 +1,291 @@
/**
* @module scenarioParser
* @description
* This module exports a function that parses an event and triggers notification scenarios based on the event data.
* It integrates with event parsing utilities, GraphQL queries, and notification queues to manage the dispatching
* of notifications via email and app channels. The function processes event data, identifies relevant scenarios,
* queries user notification preferences, and dispatches notifications accordingly.
*/
const eventParser = require("./eventParser");
const { client: gqlClient } = require("../graphql-client/graphql-client");
const queries = require("../graphql-client/queries");
const { isEmpty, isFunction } = require("lodash");
const { getMatchingScenarios } = require("./scenarioMapper");
const { dispatchEmailsToQueue } = require("./queues/emailQueue");
const { dispatchAppsToQueue } = require("./queues/appQueue");
// If true, the user who commits the action will NOT receive notifications; if false, they will.
const FILTER_SELF_FROM_WATCHERS = process.env?.FILTER_SELF_FROM_WATCHERS !== "false";
/**
* Parses an event and determines matching scenarios for notifications.
* Queries job watchers and notification settings before triggering scenario builders.
*
* @param {Object} req - The request object containing event data, trigger, table, and logger.
* @param {string} jobIdField - The field path (e.g., "req.body.event.new.id") to extract the job ID.
* @returns {Promise<void>} Resolves when the parsing and notification dispatching process is complete.
* @throws {Error} If required request fields (event data, trigger, or table) or body shop data are missing.
*/
const scenarioParser = async (req, jobIdField) => {
const { event, trigger, table } = req.body;
const {
logger,
sessionUtils: { getBodyshopFromRedis }
} = req;
// Step 1: Validate we know what user committed the action that fired the parser
const hasuraUserRole = event?.session_variables?.["x-hasura-role"];
const hasuraUserId = event?.session_variables?.["x-hasura-user-id"];
// Bail if we don't know who started the scenario
if (hasuraUserRole === "user" && !hasuraUserId) {
logger.log("No Hasura user ID found, skipping notification parsing", "info", "notifications");
return;
}
// Validate that required fields are present in the request body
if (!event?.data || !trigger || !table) {
throw new Error("Missing required request fields: event data, trigger, or table.");
}
// Step 2: Extract just the jobId using the provided jobIdField
let jobId = null;
if (jobIdField) {
let keyName = jobIdField;
const prefix = "req.body.event.new.";
if (keyName.startsWith(prefix)) {
keyName = keyName.slice(prefix.length);
}
jobId = event.data.new[keyName] || (event.data.old && event.data.old[keyName]) || null;
}
if (!jobId) {
logger.log(`No jobId found using path "${jobIdField}", skipping notification parsing`, "info", "notifications");
return;
}
// Step 3: Query job watchers associated with the job ID using GraphQL
const watcherData = await gqlClient.request(queries.GET_JOB_WATCHERS, {
jobid: jobId
});
// Transform watcher data into a simplified format with email and employee details
let jobWatchers = watcherData?.job_watchers?.map((watcher) => ({
email: watcher.user_email,
firstName: watcher?.user?.employee?.first_name,
lastName: watcher?.user?.employee?.last_name,
employeeId: watcher?.user?.employee?.id,
authId: watcher?.user?.authid
}));
if (FILTER_SELF_FROM_WATCHERS && hasuraUserRole === "user") {
jobWatchers = jobWatchers.filter((watcher) => watcher.authId !== hasuraUserId);
}
// Exit early if no job watchers are found for this job
if (isEmpty(jobWatchers)) {
logger.log(`No watchers found for jobId "${jobId}", skipping notification parsing`, "info", "notifications");
return;
}
// Step 5: Perform the full event diff now that we know there are watchers
const eventData = await eventParser({
newData: event.data.new,
oldData: event.data.old,
trigger,
table,
jobId
});
// Step 6: Extract body shop information from the job data
const bodyShopId = watcherData?.job?.bodyshop?.id;
const bodyShopName = watcherData?.job?.bodyshop?.shopname;
const bodyShopTimezone = watcherData?.job?.bodyshop?.timezone;
const jobRoNumber = watcherData?.job?.ro_number;
const jobClaimNumber = watcherData?.job?.clm_no;
// Validate that body shop data exists, as its required for notifications
if (!bodyShopId || !bodyShopName) {
throw new Error("No bodyshop data found for this job.");
}
// Step 7: Identify scenarios that match the event data and job context
const matchingScenarios = await getMatchingScenarios(
{
...eventData,
jobWatchers,
bodyShopId,
bodyShopName
},
getBodyshopFromRedis
);
// Exit early if no matching scenarios are identified
if (isEmpty(matchingScenarios)) {
logger.log(
`No matching scenarios found for jobId "${jobId}", skipping notification dispatch`,
"info",
"notifications"
);
return;
}
// Combine event data with additional context for scenario processing
const finalScenarioData = {
...eventData,
jobWatchers,
bodyShopId,
bodyShopName,
bodyShopTimezone,
matchingScenarios
};
// Step 8: Query notification settings for the job watchers
const associationsData = await gqlClient.request(queries.GET_NOTIFICATION_ASSOCIATIONS, {
emails: jobWatchers.map((x) => x.email),
shopid: bodyShopId
});
// Exit early if no notification associations are found
if (isEmpty(associationsData?.associations)) {
logger.log(
`No notification associations found for jobId "${jobId}", skipping notification dispatch`,
"info",
"notifications"
);
return;
}
// Step 9: Filter scenario watchers based on their enabled notification methods
finalScenarioData.matchingScenarios = finalScenarioData.matchingScenarios.map((scenario) => ({
...scenario,
scenarioWatchers: associationsData.associations
.filter((assoc) => {
const settings = assoc.notification_settings && assoc.notification_settings[scenario.key];
// Include only watchers with at least one enabled notification method (app, email, or FCM)
return settings && (settings.app || settings.email || settings.fcm);
})
.map((assoc) => {
const settings = assoc.notification_settings[scenario.key];
const watcherEmail = assoc.useremail;
const matchingWatcher = jobWatchers.find((watcher) => watcher.email === watcherEmail);
// Build watcher object with notification preferences and personal details
return {
user: watcherEmail,
email: settings.email,
app: settings.app,
fcm: settings.fcm,
firstName: matchingWatcher?.firstName,
lastName: matchingWatcher?.lastName,
employeeId: matchingWatcher?.employeeId,
associationId: assoc.id
};
})
}));
// Exit early if no scenarios have eligible watchers after filtering
if (isEmpty(finalScenarioData?.matchingScenarios)) {
logger.log(
`No eligible watchers after filtering for jobId "${jobId}", skipping notification dispatch`,
"info",
"notifications"
);
return;
}
// Step 10: Build and collect scenarios to dispatch notifications for
const scenariosToDispatch = [];
for (const scenario of finalScenarioData.matchingScenarios) {
// Skip if no watchers or no builder function is defined for the scenario
if (isEmpty(scenario.scenarioWatchers) || !isFunction(scenario.builder)) {
continue;
}
let eligibleWatchers = scenario.scenarioWatchers;
// Filter watchers to only those assigned to changed fields, if specified
if (scenario.matchToUserFields && scenario.matchToUserFields.length > 0) {
eligibleWatchers = scenario.scenarioWatchers.filter((watcher) =>
scenario.matchToUserFields.some(
(field) => eventData.changedFieldNames.includes(field) && eventData.data[field]?.includes(watcher.employeeId)
)
);
}
// Skip if no watchers remain after filtering
if (isEmpty(eligibleWatchers)) {
continue;
}
// Step 11: Filter scenario fields to include only those that changed
const filteredScenarioFields =
scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || [];
// Use the scenarios builder to construct the notification data
scenariosToDispatch.push(
scenario.builder({
trigger: finalScenarioData.trigger.name,
bodyShopId: finalScenarioData.bodyShopId,
bodyShopName: finalScenarioData.bodyShopName,
bodyShopTimezone: finalScenarioData.bodyShopTimezone,
scenarioKey: scenario.key,
scenarioTable: scenario.table,
scenarioFields: filteredScenarioFields,
scenarioBuilder: scenario.builder,
scenarioWatchers: eligibleWatchers,
jobId: finalScenarioData.jobId,
jobRoNumber: jobRoNumber,
jobClaimNumber: jobClaimNumber,
isNew: finalScenarioData.isNew,
changedFieldNames: finalScenarioData.changedFieldNames,
changedFields: finalScenarioData.changedFields,
data: finalScenarioData.data
})
);
}
if (isEmpty(scenariosToDispatch)) {
logger.log(`No scenarios to dispatch for jobId "${jobId}" after building`, "info", "notifications");
return;
}
// Step 12: Dispatch email notifications to the email queue
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email);
if (!isEmpty(emailsToDispatch)) {
dispatchEmailsToQueue({ emailsToDispatch, logger }).catch((e) =>
logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, {
message: e?.message,
stack: e?.stack
})
);
}
// Step 13: Dispatch app notifications to the app queue
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
if (!isEmpty(appsToDispatch)) {
dispatchAppsToQueue({ appsToDispatch, logger }).catch((e) =>
logger.log("Something went wrong dispatching apps to the App Notification Queue", "error", "queue", null, {
message: e?.message,
stack: e?.stack
})
);
}
};
module.exports = scenarioParser;

View File

@@ -0,0 +1,42 @@
/**
* @module jobAssignmentHelper
* @description
* This module provides utility functions for handling job assignment types.
* Currently, it includes a function to map lowercase job assignment codes to their corresponding human-readable job types.
*/
/**
* Maps a lowercase job assignment code to its corresponding human-readable job type.
*
* @param {string} data - The lowercase job assignment code (e.g., "employee_pre").
* @returns {string} The human-readable job type (e.g., "Prep"). Returns an empty string if the code is unknown or if the input is null/undefined.
*/
const getJobAssignmentType = (data) => {
switch (data) {
case "employee_prep":
return "Prep";
case "employee_body":
return "Body";
case "employee_csr":
return "CSR";
case "employee_refinish":
return "Refinish";
default:
return "";
}
};
const formatTaskPriority = (priority) => {
if (priority === 1) {
return "High";
} else if (priority === 3) {
return "Low";
} else {
return "Medium";
}
};
module.exports = {
getJobAssignmentType,
formatTaskPriority
};

View File

@@ -160,6 +160,11 @@ async function OpenSearchUpdateHandler(req, res) {
res.status(200).json(response.body);
}
} catch (error) {
// We don't want this spam message existing in development/test,
if (process.env?.NODE_ENV !== "production" && error?.message === "Invalid URL") {
return res.status(400).json(JSON.stringify(error));
}
logger.log("os-handler-error", "ERROR", null, null, {
id: req.body.event.data.new.id,
index: req.body.table.name,
@@ -167,6 +172,7 @@ async function OpenSearchUpdateHandler(req, res) {
stack: error.stack
// body: document
});
res.status(400).json(JSON.stringify(error));
}
}
@@ -248,7 +254,8 @@ async function OpenSearchSearchHandler(req, res) {
"*ownr_fn^8",
"*ownr_co_nm^8",
"*ownr_ph1^8",
"*ownr_ph2^8"
"*ownr_ph2^8",
"*comment^6"
// "*"
]
}

View File

@@ -13,7 +13,8 @@ const withUserGraphQLClientMiddleware = require("../middleware/withUserGraphQLCl
const { taskAssignedEmail, tasksRemindEmail } = require("../email/tasksEmails");
const { canvastest } = require("../render/canvas-handler");
const { alertCheck } = require("../alerts/alertcheck");
const uuid = require("uuid").v4;
const updateBodyshopCache = require("../web-sockets/updateBodyshopCache");
const { v4 } = require("uuid");
//Test route to ensure Express is responding.
router.get("/test", eventAuthorizationMiddleware, async function (req, res) {
@@ -58,6 +59,7 @@ router.get("/test-logs", eventAuthorizationMiddleware, (req, res) => {
return res.status(500).send("Logs tested.");
});
router.get("/wstest", eventAuthorizationMiddleware, (req, res) => {
const { ioRedis } = req;
ioRedis.to(`bodyshop-broadcast-room:bfec8c8c-b7f1-49e0-be4c-524455f4e582`).emit("new-message-summary", {
@@ -81,7 +83,7 @@ router.get("/wstest", eventAuthorizationMiddleware, (req, res) => {
// image_path: [],
newMessage: {
conversation: {
id: uuid(),
id: v4(),
archived: false,
bodyshop: {
id: "bfec8c8c-b7f1-49e0-be4c-524455f4e582",
@@ -137,4 +139,20 @@ router.post("/canvastest", validateFirebaseIdTokenMiddleware, canvastest);
// Alert Check
router.post("/alertcheck", eventAuthorizationMiddleware, alertCheck);
// Redis Cache Routes
router.post("/bodyshop-cache", eventAuthorizationMiddleware, updateBodyshopCache);
// Health Check for docker-compose-cluster load balancer, only available in development
if (process.env.NODE_ENV === "development") {
router.get("/health", (req, res) => {
const healthStatus = {
status: "healthy",
timestamp: new Date().toISOString(),
environment: process.env.NODE_ENV || "unknown",
uptime: process.uptime()
};
res.status(200).json(healthStatus);
});
}
module.exports = router;

View File

@@ -2,13 +2,18 @@ const express = require("express");
const validateFirebaseIdTokenMiddleware = require("../middleware/validateFirebaseIdTokenMiddleware");
const { subscribe, unsubscribe, sendNotification } = require("../firebase/firebase-handler");
const eventAuthorizationMiddleware = require("../middleware/eventAuthorizationMIddleware");
const handlePartsOrderChange = require("../notifications/eventHandlers/handlePartsOrderChange");
const handlePartsDispatchChange = require("../notifications/eventHandlers/handlePartsDispatchChange");
const handleTasksChange = require("../notifications/eventHandlers/handleTasksChange");
const handleTimeTicketsChange = require("../notifications/eventHandlers/handleTimeTicketsChange");
const handleJobsChange = require("../notifications/eventHandlers/handeJobsChange");
const handleBillsChange = require("../notifications/eventHandlers/handleBillsChange");
const {
handleJobsChange,
handleBillsChange,
handlePartsOrderChange,
handlePartsDispatchChange,
handleTasksChange,
handleTimeTicketsChange,
handleNotesChange,
handlePaymentsChange,
handleDocumentsChange,
handleJobLinesChange
} = require("../notifications/eventHandlers");
const router = express.Router();
@@ -24,5 +29,9 @@ router.post("/events/handlePartsOrderChange", eventAuthorizationMiddleware, hand
router.post("/events/handlePartsDispatchChange", eventAuthorizationMiddleware, handlePartsDispatchChange);
router.post("/events/handleTasksChange", eventAuthorizationMiddleware, handleTasksChange);
router.post("/events/handleTimeTicketsChange", eventAuthorizationMiddleware, handleTimeTicketsChange);
router.post("/events/handleNotesChange", eventAuthorizationMiddleware, handleNotesChange);
router.post("/events/handlePaymentsChange", eventAuthorizationMiddleware, handlePaymentsChange);
router.post("/events/handleDocumentsChange", eventAuthorizationMiddleware, handleDocumentsChange);
router.post("/events/handleJobLinesChange", eventAuthorizationMiddleware, handleJobLinesChange);
module.exports = router;

View File

@@ -7,6 +7,7 @@ const { status, markConversationRead } = require("../sms/status");
const validateFirebaseIdTokenMiddleware = require("../middleware/validateFirebaseIdTokenMiddleware");
// Twilio Webhook Middleware for production
// TODO: Look into this because it technically is never validating anything
const twilioWebhookMiddleware = twilio.webhook({ validate: process.env.NODE_ENV === "PRODUCTION" });
router.post("/receive", twilioWebhookMiddleware, receive);

View File

@@ -0,0 +1,52 @@
// server/utils/cleanupManager.js
const logger = require("./logger");
let cleanupTasks = [];
let isShuttingDown = false;
/**
* Register a cleanup task to be executed during shutdown
* @param {Function} task - The cleanup task to register
*/
function registerCleanupTask(task) {
cleanupTasks.push(task);
}
/**
* Handle SIGTERM signal for graceful shutdown
*/
async function handleSigterm() {
if (isShuttingDown) {
logger.log("sigterm-api", "WARN", null, null, { message: "Shutdown already in progress, ignoring signal." });
return;
}
isShuttingDown = true;
logger.log("sigterm-api", "WARN", null, null, { message: "SIGTERM Received. Starting graceful shutdown." });
try {
for (const task of cleanupTasks) {
logger.log("sigterm-api", "WARN", null, null, { message: `Running cleanup task: ${task.name}` });
await task();
}
logger.log("sigterm-api", "WARN", null, null, { message: `All cleanup tasks completed.` });
} catch (error) {
logger.log("sigterm-api-error", "ERROR", null, null, { message: error.message, stack: error.stack });
}
process.exit(0);
}
/**
* Initialize cleanup manager with process event listeners
*/
function initializeCleanupManager() {
process.on("SIGTERM", handleSigterm);
process.on("SIGINT", handleSigterm); // Handle Ctrl+C
}
module.exports = {
registerCleanupTask,
initializeCleanupManager
};

View File

@@ -0,0 +1,7 @@
const { inspect } = require("node:util");
const consoleDir = (data) => {
console.log(inspect(data, { showHidden: false, depth: null, colors: true }));
};
module.exports = consoleDir;

View File

@@ -0,0 +1,10 @@
const logger = require("./logger");
const devDebugLogger = (message, meta) => {
if (process.env?.NODE_ENV === "production") {
return;
}
logger.logger.debug(message, meta);
};
module.exports = devDebugLogger;

View File

@@ -0,0 +1,3 @@
const getBullMQPrefix = () => (process.env?.NODE_ENV === "production" ? "{PROD-BULLMQ}" : "{DEV-BULLMQ}");
module.exports = getBullMQPrefix;

View File

@@ -58,4 +58,20 @@ exports.InstanceRegion = () =>
rome: "us-east-2"
});
exports.InstanceEndpoints = () =>
InstanceManager({
imex:
process.env?.NODE_ENV === "development"
? "https://localhost:3000"
: process.env?.NODE_ENV === "test"
? "https://test.imex.online"
: "https://imex.online",
rome:
process.env?.NODE_ENV === "development"
? "https://localhost:3000"
: process.env?.NODE_ENV === "test"
? "https://test.romeonline.io"
: "https://romeonline.io"
});
exports.default = InstanceManager;

View File

@@ -1,7 +1,9 @@
const applyIOHelpers = ({ app, api, io, logger }) => {
const getBodyshopRoom = (bodyshopID) => `bodyshop-broadcast-room:${bodyshopID}`;
// Global Bodyshop Room
const getBodyshopRoom = (bodyshopId) => `bodyshop-broadcast-room:${bodyshopId}`;
// Messaging - conversation specific room to handle detailed messages when the user has a conversation open.
const getBodyshopConversationRoom = ({bodyshopId, conversationId}) =>
const getBodyshopConversationRoom = ({ bodyshopId, conversationId }) =>
`bodyshop-conversation-room:${bodyshopId}:${conversationId}`;
const ioHelpersAPI = {

View File

@@ -1,3 +1,48 @@
const { GET_BODYSHOP_BY_ID } = require("../graphql-client/queries");
const devDebugLogger = require("./devDebugLogger");
const client = require("../graphql-client/graphql-client").client;
const BODYSHOP_CACHE_TTL = 3600; // 1 hour
/**
* Generate a cache key for a bodyshop
* @param bodyshopId
* @returns {`bodyshop-cache:${string}`}
*/
const getBodyshopCacheKey = (bodyshopId) => `bodyshop-cache:${bodyshopId}`;
/**
* Generate a cache key for a user socket mapping
* @param email
* @returns {`user:${string}:${string}:socketMapping`}
*/
const getUserSocketMappingKey = (email) =>
`user:${process.env?.NODE_ENV === "production" ? "prod" : "dev"}:${email}:socketMapping`;
/**
* Fetch bodyshop data from the database
* @param bodyshopId
* @param logger
* @returns {Promise<*>}
*/
const fetchBodyshopFromDB = async (bodyshopId, logger) => {
try {
const response = await client.request(GET_BODYSHOP_BY_ID, { id: bodyshopId });
const bodyshop = response.bodyshops_by_pk;
if (!bodyshop) {
throw new Error(`Bodyshop with ID ${bodyshopId} not found`);
}
return bodyshop; // Return the full object as-is
} catch (error) {
logger.log("fetch-bodyshop-from-db", "ERROR", "redis", null, {
bodyshopId,
error: error?.message,
stack: error?.stack
});
throw error;
}
};
/**
* Apply Redis helper functions
* @param pubClient
@@ -33,107 +78,332 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Store multiple session data in Redis
const setMultipleSessionData = async (socketId, keyValues) => {
/**
* Add a socket mapping for a user
* @param email
* @param socketId
* @param bodyshopId
* @returns {Promise<void>}
*/
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
const socketMappingKey = getUserSocketMappingKey(email);
try {
// keyValues is expected to be an object { key1: value1, key2: value2, ... }
const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
await pubClient.hset(`socket:${socketId}`, ...entries.flat());
devDebugLogger(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`);
// Save the mapping: socketId -> bodyshopId
await pubClient.hset(socketMappingKey, socketId, bodyshopId);
// Set TTL (24 hours) for the mapping hash
await pubClient.expire(socketMappingKey, 86400);
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
logger.log(`Error adding socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
}
};
/**
* Refresh the TTL for a user's socket mapping
* @param email
* @returns {Promise<void>}
*/
const refreshUserSocketTTL = async (email) => {
const socketMappingKey = getUserSocketMappingKey(email);
try {
const exists = await pubClient.exists(socketMappingKey);
if (exists) {
await pubClient.expire(socketMappingKey, 86400);
devDebugLogger(`Refreshed TTL for ${email} socket mapping`);
}
} catch (error) {
logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis");
}
};
/**
* Remove a socket mapping for a user
* @param email
* @param socketId
* @returns {Promise<void>}
*/
const removeUserSocketMapping = async (email, socketId) => {
const socketMappingKey = getUserSocketMappingKey(email);
try {
devDebugLogger(`Removing socket ${socketId} mapping for user ${email}`);
// Look up the bodyshopId associated with this socket
const bodyshopId = await pubClient.hget(socketMappingKey, socketId);
if (!bodyshopId) {
devDebugLogger(`Socket ${socketId} not found for user ${email}`);
return;
}
// Remove the socket mapping
await pubClient.hdel(socketMappingKey, socketId);
devDebugLogger(`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`);
// Refresh TTL if any socket mappings remain
const remainingSockets = await pubClient.hlen(socketMappingKey);
if (remainingSockets > 0) {
await pubClient.expire(socketMappingKey, 86400);
}
} catch (error) {
logger.log(`Error removing socket mapping for ${email}: ${error}`, "ERROR", "redis");
}
};
/**
* Get all socket mappings for a user
* @param email
* @returns {Promise<{}>}
*/
const getUserSocketMapping = async (email) => {
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
const ttl = await pubClient.ttl(socketMappingKey);
// Group socket IDs by bodyshopId
const result = {};
for (const [socketId, bodyshopId] of Object.entries(mapping)) {
if (!result[bodyshopId]) {
result[bodyshopId] = { socketIds: [], ttl };
}
result[bodyshopId].socketIds.push(socketId);
}
return result;
} catch (error) {
console.error(`Error retrieving socket mappings for ${email}:`, error);
throw error;
}
};
/**
* Get socket IDs for a user by bodyshopId
* @param email
* @param bodyshopId
* @returns {Promise<{socketIds: [string, string], ttl: *}>}
*/
const getUserSocketMappingByBodyshop = async (email, bodyshopId) => {
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
const ttl = await pubClient.ttl(socketMappingKey);
// Filter socket IDs for the provided bodyshopId
const socketIds = Object.entries(mapping).reduce((acc, [socketId, bId]) => {
if (bId === bodyshopId) {
acc.push(socketId);
}
return acc;
}, []);
return { socketIds, ttl };
} catch (error) {
logger.log(`Error retrieving socket mappings for ${email} by bodyshop ${bodyshopId}: ${error}`, "ERROR", "redis");
throw error;
}
};
/**
* Get bodyshop data from Redis
* @param bodyshopId
* @returns {Promise<*>}
*/
const getBodyshopFromRedis = async (bodyshopId) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
// Check if data exists in Redis
const cachedData = await pubClient.get(key);
if (cachedData) {
return JSON.parse(cachedData); // Parse and return the full object
}
// Cache miss: fetch from DB
const bodyshopData = await fetchBodyshopFromDB(bodyshopId, logger);
// Store in Redis as a single JSON string
const jsonData = JSON.stringify(bodyshopData);
await pubClient.set(key, jsonData);
await pubClient.expire(key, BODYSHOP_CACHE_TTL);
devDebugLogger("bodyshop-cache-miss", {
bodyshopId,
action: "Fetched from DB and cached"
});
return bodyshopData; // Return the full object
} catch (error) {
logger.log("get-bodyshop-from-redis", "ERROR", "redis", null, {
bodyshopId,
error: error.message
});
throw error;
}
};
/**
* Update or invalidate bodyshop data in Redis
* @param bodyshopId
* @param values
* @returns {Promise<void>}
*/
const updateOrInvalidateBodyshopFromRedis = async (bodyshopId, values = null) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
if (!values) {
// Invalidate cache by deleting the key
await pubClient.del(key);
devDebugLogger("bodyshop-cache-invalidate", {
bodyshopId,
action: "Cache invalidated"
});
} else {
// Update cache with the full provided values
const jsonData = JSON.stringify(values);
await pubClient.set(key, jsonData);
await pubClient.expire(key, BODYSHOP_CACHE_TTL);
devDebugLogger("bodyshop-cache-update", {
bodyshopId,
action: "Cache updated",
values
});
}
} catch (error) {
logger.log("update-or-invalidate-bodyshop-from-redis", "ERROR", "api", "redis", {
bodyshopId,
values,
error: error.message
});
throw error;
}
};
// NOTE: The following code was written for an abandoned branch and things have changes since the,
// Leaving it here for demonstration purposes, commenting it out so it does not get used
// Store multiple session data in Redis
// const setMultipleSessionData = async (socketId, keyValues) => {
// try {
// // keyValues is expected to be an object { key1: value1, key2: value2, ... }
// const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
// await pubClient.hset(`socket:${socketId}`, ...entries.flat());
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Retrieve multiple session data from Redis
const getMultipleSessionData = async (socketId, keys) => {
try {
const data = await pubClient.hmget(`socket:${socketId}`, keys);
// Redis returns an object with null values for missing keys, so we parse the non-null ones
return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
} catch (error) {
logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// const getMultipleSessionData = async (socketId, keys) => {
// try {
// const data = await pubClient.hmget(`socket:${socketId}`, keys);
// // Redis returns an object with null values for missing keys, so we parse the non-null ones
// return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
// } catch (error) {
// logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
try {
// Use Redis multi/pipeline to batch the commands
const multi = pubClient.multi();
keyValueArray.forEach(([key, value]) => {
multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
});
await multi.exec(); // Execute all queued commands
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
// try {
// // Use Redis multi/pipeline to batch the commands
// const multi = pubClient.multi();
// keyValueArray.forEach(([key, value]) => {
// multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
// });
// await multi.exec(); // Execute all queued commands
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to add an item to the end of the Redis list
const addItemToEndOfList = async (socketId, key, newItem) => {
try {
await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
logger.log(`Error adding item to the end of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// const addItemToEndOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// let userEmail = "unknown";
// let socketMappings = {};
// try {
// const userData = await getSessionData(socketId, "user");
// if (userData && userData.email) {
// userEmail = userData.email;
// socketMappings = await getUserSocketMapping(userEmail);
// }
// } catch (sessionError) {
// logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
// }
// const mappingString = JSON.stringify(socketMappings, null, 2);
// const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
// logger.log(errorMessage, "ERROR", "redis");
// }
// };
// Helper function to add an item to the beginning of the Redis list
const addItemToBeginningOfList = async (socketId, key, newItem) => {
try {
await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// const addItemToBeginningOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to clear a list in Redis
const clearList = async (socketId, key) => {
try {
await pubClient.del(`socket:${socketId}:${key}`);
} catch (error) {
logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// const clearList = async (socketId, key) => {
// try {
// await pubClient.del(`socket:${socketId}:${key}`);
// } catch (error) {
// logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Add methods to manage room users
const addUserToRoom = async (room, user) => {
try {
await pubClient.sadd(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
}
};
// const addUserToRoom = async (room, user) => {
// try {
// await pubClient.sadd(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
const removeUserFromRoom = async (room, user) => {
try {
await pubClient.srem(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
}
};
// Remove users from room
// const removeUserFromRoom = async (room, user) => {
// try {
// await pubClient.srem(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
const getUsersInRoom = async (room) => {
try {
const users = await pubClient.smembers(room);
return users.map((user) => JSON.parse(user));
} catch (error) {
logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
}
};
// Get Users in room
// const getUsersInRoom = async (room) => {
// try {
// const users = await pubClient.smembers(room);
// return users.map((user) => JSON.parse(user));
// } catch (error) {
// logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
// }
// };
const api = {
getUserSocketMappingKey,
getBodyshopCacheKey,
setSessionData,
getSessionData,
clearSessionData,
setMultipleSessionData,
getMultipleSessionData,
setMultipleFromArraySessionData,
addItemToEndOfList,
addItemToBeginningOfList,
clearList,
addUserToRoom,
removeUserFromRoom,
getUsersInRoom
addUserSocketMapping,
removeUserSocketMapping,
getUserSocketMappingByBodyshop,
getUserSocketMapping,
refreshUserSocketTTL,
getBodyshopFromRedis,
updateOrInvalidateBodyshopFromRedis
// setMultipleSessionData,
// getMultipleSessionData,
// setMultipleFromArraySessionData,
// addItemToEndOfList,
// addItemToBeginningOfList,
// clearList,
// addUserToRoom,
// removeUserFromRoom,
// getUsersInRoom,
};
Object.assign(module.exports, api);
@@ -143,86 +413,6 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
next();
});
// Demo to show how all the helper functions work
// const demoSessionData = async () => {
// const socketId = "testSocketId";
//
// // 1. Test setSessionData and getSessionData
// await setSessionData(socketId, "field1", "Hello, Redis!");
// const field1Value = await getSessionData(socketId, "field1");
// console.log("Retrieved single field value:", field1Value);
//
// // 2. Test setMultipleSessionData and getMultipleSessionData
// await setMultipleSessionData(socketId, { field2: "Second Value", field3: "Third Value" });
// const multipleFields = await getMultipleSessionData(socketId, ["field2", "field3"]);
// console.log("Retrieved multiple field values:", multipleFields);
//
// // 3. Test setMultipleFromArraySessionData
// await setMultipleFromArraySessionData(socketId, [
// ["field4", "Fourth Value"],
// ["field5", "Fifth Value"]
// ]);
//
// // Retrieve and log all fields
// const allFields = await getMultipleSessionData(socketId, ["field1", "field2", "field3", "field4", "field5"]);
// console.log("Retrieved all field values:", allFields);
//
// // 4. Test list functions
// // Add item to the end of a Redis list
// await addItemToEndOfList(socketId, "logEvents", { event: "Log Event 1", timestamp: new Date() });
// await addItemToEndOfList(socketId, "logEvents", { event: "Log Event 2", timestamp: new Date() });
//
// // Add item to the beginning of a Redis list
// await addItemToBeginningOfList(socketId, "logEvents", { event: "First Log Event", timestamp: new Date() });
//
// // Retrieve the entire list
// const logEventsData = await pubClient.lrange(`socket:${socketId}:logEvents`, 0, -1);
// const logEvents = logEventsData.map((item) => JSON.parse(item));
// console.log("Log Events List:", logEvents);
//
// // 5. Test clearList
// await clearList(socketId, "logEvents");
// console.log("Log Events List cleared.");
//
// // Retrieve the list after clearing to confirm it's empty
// const logEventsAfterClear = await pubClient.lrange(`socket:${socketId}:logEvents`, 0, -1);
// console.log("Log Events List after clearing:", logEventsAfterClear); // Should be an empty array
//
// // 6. Test clearSessionData
// await clearSessionData(socketId);
// console.log("Session data cleared.");
//
// // 7. Test room functions
// const roomName = "testRoom";
// const user1 = { id: 1, name: "Alice" };
// const user2 = { id: 2, name: "Bob" };
//
// // Add users to room
// await addUserToRoom(roomName, user1);
// await addUserToRoom(roomName, user2);
//
// // Get users in room
// const usersInRoom = await getUsersInRoom(roomName);
// console.log(`Users in room ${roomName}:`, usersInRoom);
//
// // Remove a user from room
// await removeUserFromRoom(roomName, user1);
//
// // Get users in room after removal
// const usersInRoomAfterRemoval = await getUsersInRoom(roomName);
// console.log(`Users in room ${roomName} after removal:`, usersInRoomAfterRemoval);
//
// // Clean up: remove remaining users from room
// await removeUserFromRoom(roomName, user2);
//
// // Verify room is empty
// const usersInRoomAfterCleanup = await getUsersInRoom(roomName);
// console.log(`Users in room ${roomName} after cleanup:`, usersInRoomAfterCleanup); // Should be empty
// };
// if (process.env.NODE_ENV === "development") {
// demoSessionData();
// }
return api;
};

View File

@@ -2,7 +2,7 @@ const { admin } = require("../firebase/firebase-handler");
const redisSocketEvents = ({
io,
redisHelpers: { setSessionData, clearSessionData }, // Note: Used if we persist user to Redis
redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL, getUserSocketMappingByBodyshop },
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
logger
}) => {
@@ -12,78 +12,67 @@ const redisSocketEvents = ({
};
// Socket Auth Middleware
const authMiddleware = (socket, next) => {
const authMiddleware = async (socket, next) => {
const { token, bodyshopId } = socket.handshake.auth;
if (!token) {
return next(new Error("Authentication error - no authorization token."));
}
if (!bodyshopId) {
return next(new Error("Authentication error - no bodyshopId provided."));
}
try {
if (socket.handshake.auth.token) {
admin
.auth()
.verifyIdToken(socket.handshake.auth.token)
.then((user) => {
socket.user = user;
// Note: if we ever want to capture user data across sockets
// Uncomment the following line and then remove the next() to a second then()
// return setSessionData(socket.id, "user", user);
next();
})
.catch((error) => {
next(new Error(`Authentication error: ${error.message}`));
});
} else {
next(new Error("Authentication error - no authorization token."));
}
const user = await admin.auth().verifyIdToken(token);
socket.user = user;
await addUserSocketMapping(user.email, socket.id, bodyshopId);
next();
} catch (error) {
logger.log("websocket-connection-error", "error", null, null, {
...error
});
next(new Error(`Authentication error ${error}`));
next(new Error(`Authentication error: ${error.message}`));
}
};
// Register Socket Events
const registerSocketEvents = (socket) => {
// Uncomment for further testing
// createLogEvent(socket, "debug", `Registering RedisIO Socket Events.`);
// Token Update Events
const registerUpdateEvents = (socket) => {
let latestTokenTimestamp = 0;
const updateToken = async (newToken) => {
const updateToken = async ({ token, bodyshopId }) => {
const currentTimestamp = Date.now();
latestTokenTimestamp = currentTimestamp;
try {
// Verify token with Firebase Admin SDK
const user = await admin.auth().verifyIdToken(newToken, true);
if (!token || !bodyshopId) {
socket.emit("token-updated", { success: false, error: "Token or bodyshopId missing" });
return;
}
// Skip outdated token validations
try {
const user = await admin.auth().verifyIdToken(token, true);
if (currentTimestamp < latestTokenTimestamp) {
createLogEvent(socket, "warn", "Outdated token validation skipped.");
return;
}
socket.user = user;
createLogEvent(socket, "debug", `Token updated successfully for socket ID: ${socket.id}`);
await refreshUserSocketTTL(user.email, bodyshopId);
createLogEvent(
socket,
"debug",
`Token updated successfully for socket ID: ${socket.id} (bodyshop: ${bodyshopId})`
);
socket.emit("token-updated", { success: true });
} catch (error) {
if (error.code === "auth/id-token-expired") {
createLogEvent(socket, "warn", "Stale token received, waiting for new token");
socket.emit("token-updated", {
success: false,
error: "Stale token."
});
return; // Avoid disconnecting for expired tokens
socket.emit("token-updated", { success: false, error: "Stale token." });
return;
}
createLogEvent(socket, "error", `Token update failed for socket ID: ${socket.id}, Error: ${error.message}`);
socket.emit("token-updated", { success: false, error: error.message });
// Optionally disconnect for invalid tokens or other errors
socket.disconnect();
}
};
socket.on("update-token", updateToken);
};
@@ -127,16 +116,15 @@ const redisSocketEvents = ({
// Disconnect Events
const registerDisconnectEvents = (socket) => {
const disconnect = () => {
// Uncomment for further testing
// createLogEvent(socket, "debug", `User disconnected.`);
const disconnect = async () => {
if (socket.user?.email) {
await removeUserSocketMapping(socket.user.email, socket.id);
}
// Leave all rooms except the default room (socket.id)
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
for (const room of rooms) {
socket.leave(room);
}
// If we ever want to persist the user across workers
// clearSessionData(socket.id);
};
socket.on("disconnect", disconnect);
@@ -157,6 +145,7 @@ const redisSocketEvents = ({
});
}
};
const leaveConversationRoom = ({ bodyshopId, conversationId }) => {
try {
const room = getBodyshopConversationRoom({ bodyshopId, conversationId });
@@ -196,11 +185,58 @@ const redisSocketEvents = ({
socket.on("leave-bodyshop-conversation", leaveConversationRoom);
};
// Sync Notification Read Events
const registerSyncEvents = (socket) => {
socket.on("sync-notification-read", async ({ email, bodyshopId, notificationId }) => {
try {
const userEmail = socket.user.email;
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-notification-read", { notificationId, timestamp });
}
});
createLogEvent(
socket,
"debug",
`Synced notification ${notificationId} read for ${userEmail} in bodyshop ${bodyshopId}`
);
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing notification read: ${error.message}`);
}
});
socket.on("sync-all-notifications-read", async ({ email, bodyshopId }) => {
try {
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-all-notifications-read", { timestamp });
}
});
createLogEvent(socket, "debug", `Synced all notifications read for ${email} in bodyshop ${bodyshopId}`);
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing all notifications read: ${error.message}`);
}
});
};
// Call Handlers
registerRoomAndBroadcastEvents(socket);
registerUpdateEvents(socket);
registerMessagingEvents(socket);
registerDisconnectEvents(socket);
registerSyncEvents(socket);
};
// Associate Middleware and Handlers

View File

@@ -0,0 +1,36 @@
/**
* Update or invalidate bodyshop cache
* @param req
* @param res
* @returns {Promise<void>}
*/
const updateBodyshopCache = async (req, res) => {
const {
sessionUtils: { updateOrInvalidateBodyshopFromRedis },
logger
} = req;
const { event } = req.body;
const { new: newData } = event.data;
try {
if (newData && newData.id) {
// Update cache with the full new data object
await updateOrInvalidateBodyshopFromRedis(newData.id, newData);
logger.logger.debug("Bodyshop cache updated successfully.");
} else {
// Invalidate cache if no valid data provided
await updateOrInvalidateBodyshopFromRedis(newData.id);
logger.logger.debug("Bodyshop cache invalidated successfully.");
}
res.status(200).json({ success: true });
} catch (error) {
logger.log("bodyshop-cache-update-error", "ERROR", "api", "redis", {
message: error?.message,
stack: error?.stack
});
res.status(500).json({ success: false, error: error.message });
}
};
module.exports = updateBodyshopCache;