Compare commits

...

17 Commits

Author SHA1 Message Date
Dave Richer
2c0eab9366 IO-3096-GlobalNotifications - Correct time zone from footer in notification email 2025-03-14 11:27:28 -04:00
Patrick Fic
b831d8ca8a IO-3096 Add indexes for notifications. 2025-03-13 15:27:20 -07:00
Dave Richer
69da6bccf7 IO-3096-GlobalNotifications - Adjust splits 2025-03-13 17:37:36 -04:00
Dave Richer
e3d7ebd7d8 IO-3096-GlobalNotifications - Verify status reporter is a function and exists prior to calling it in cleanup task 2025-03-13 14:59:58 -04:00
Dave Richer
5f0b63a192 IO-3096-GlobalNotifications - Add in a function to exclude extra logging from production 2025-03-13 13:56:30 -04:00
Dave Richer
7a5ac739ab Merge branch 'feature/IO-3170-HotFixForRedis' into feature/IO-3096-GlobalNotifications 2025-03-13 11:49:31 -04:00
Dave Richer
e2297be0af IO-3170-HotfixFoRedis 2025-03-13 11:47:21 -04:00
Dave Richer
73c4983342 Merged in feature/IO-3166-Global-Notifications-Part-2 (pull request #2199)
IO-3166-Global-Notifications-Part-2: Remove unused event handler (hasura),
2025-03-13 15:31:28 +00:00
Dave Richer
166e1e4030 IO-3166-Global-Notifications-Part-2: Remove unused event handler (hasura), 2025-03-13 11:29:41 -04:00
Dave Richer
5fa7377121 Merged in feature/IO-3166-Global-Notifications-Part-2 (pull request #2196)
IO-3166-Global-Notifications-Part-2: add additional key prefixes for dev v prod
2025-03-13 01:12:33 +00:00
Dave Richer
f21ba8e087 IO-3166-Global-Notifications-Part-2: add additional key prefixes for dev v prod 2025-03-12 21:10:42 -04:00
Dave Richer
d56d1f369c Merged in feature/IO-3166-Global-Notifications-Part-2 (pull request #2193)
IO-3166-Global-Notifications-Part-2: Make sure BULLMQ prefixes do not collide
2025-03-13 00:01:56 +00:00
Dave Richer
360a1954f4 IO-3166-Global-Notifications-Part-2: Make sure BULLMQ prefixes do not collide 2025-03-12 20:00:53 -04:00
Dave Richer
6b047418cc Merged in feature/IO-3166-Global-Notifications-Part-2 (pull request #2190)
Feature/IO-3166 Global Notifications Part 2
2025-03-12 16:06:34 +00:00
Dave Richer
87db292e5d IO-3166-Global-Notifications-Part-2: Fix typo in builder function name 2025-03-12 12:05:21 -04:00
Dave Richer
9ef8440e64 IO-3166-Global-Notifications-Part-2: Add Enabled key to scenario map (backend), filter out scenarios not enabled. 2025-03-12 11:46:09 -04:00
Dave Richer
8ae3b28cb6 IO-3166-Global-Notifications-Part-2: checkpoint, Modify additional strings as per Allan, Refactor builder down to prevent duplicate logic, comment out supplement imported. 2025-03-12 11:34:50 -04:00
23 changed files with 567 additions and 759 deletions

View File

@@ -48,8 +48,6 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline
const { t } = useTranslation();
const navigate = useNavigate();
const scenarioNotificationsOn = client?.getTreatment("Realtime_Notifications_UI") === "on";
useEffect(() => {
if (!navigator.onLine) {
setOnline(false);
@@ -203,12 +201,7 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline
path="/manage/*"
element={
<ErrorBoundary>
<SocketProvider
bodyshop={bodyshop}
navigate={navigate}
currentUser={currentUser}
scenarioNotificationsOn={scenarioNotificationsOn}
>
<SocketProvider bodyshop={bodyshop} navigate={navigate} currentUser={currentUser}>
<PrivateRoute isAuthorized={currentUser.authorized} />
</SocketProvider>
</ErrorBoundary>
@@ -220,12 +213,7 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline
path="/tech/*"
element={
<ErrorBoundary>
<SocketProvider
bodyshop={bodyshop}
navigate={navigate}
currentUser={currentUser}
scenarioNotificationsOn={scenarioNotificationsOn}
>
<SocketProvider bodyshop={bodyshop} navigate={navigate} currentUser={currentUser}>
<PrivateRoute isAuthorized={currentUser.authorized} />
</SocketProvider>
</ErrorBoundary>

View File

@@ -14,6 +14,7 @@ import {
} from "../../graphql/notifications.queries.js";
import { useMutation } from "@apollo/client";
import { useTranslation } from "react-i18next";
import { useSplitTreatments } from "@splitsoftware/splitio-react";
const SocketContext = createContext(null);
@@ -25,11 +26,10 @@ const INITIAL_NOTIFICATIONS = 10;
* @param bodyshop
* @param navigate
* @param currentUser
* @param scenarioNotificationsOn
* @returns {JSX.Element}
* @constructor
*/
const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNotificationsOn }) => {
const SocketProvider = ({ children, bodyshop, navigate, currentUser }) => {
const socketRef = useRef(null);
const [clientId, setClientId] = useState(null);
const [isConnected, setIsConnected] = useState(false);
@@ -37,6 +37,14 @@ const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNot
const userAssociationId = bodyshop?.associations?.[0]?.id;
const { t } = useTranslation();
const {
treatments: { Realtime_Notifications_UI }
} = useSplitTreatments({
attributes: {},
names: ["Realtime_Notifications_UI"],
splitKey: bodyshop?.imexshopid
});
const [markNotificationRead] = useMutation(MARK_NOTIFICATION_READ, {
update: (cache, { data: { update_notifications } }) => {
const timestamp = new Date().toISOString();
@@ -209,7 +217,7 @@ const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNot
const handleNotification = (data) => {
// Scenario Notifications have been disabled, bail.
if (!scenarioNotificationsOn) {
if (Realtime_Notifications_UI?.treatment !== "on") {
return;
}
@@ -329,7 +337,7 @@ const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNot
const handleSyncNotificationRead = ({ notificationId, timestamp }) => {
// Scenario Notifications have been disabled, bail.
if (!scenarioNotificationsOn) {
if (Realtime_Notifications_UI?.treatment !== "on") {
return;
}
@@ -371,7 +379,7 @@ const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNot
const handleSyncAllNotificationsRead = ({ timestamp }) => {
// Scenario Notifications have been disabled, bail.
if (!scenarioNotificationsOn) {
if (Realtime_Notifications_UI?.treatment !== "on") {
return;
}
@@ -462,7 +470,7 @@ const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNot
markAllNotificationsRead,
navigate,
currentUser,
scenarioNotificationsOn,
Realtime_Notifications_UI,
t
]);
@@ -474,7 +482,7 @@ const SocketProvider = ({ children, bodyshop, navigate, currentUser, scenarioNot
isConnected,
markNotificationRead,
markAllNotificationsRead,
scenarioNotificationsOn
scenarioNotificationsOn: Realtime_Notifications_UI?.treatment === "on"
}}
>
{children}

View File

@@ -1,10 +1,13 @@
/** Notification Scenarios
* @description This file contains the scenarios for job notifications.
* @type {string[]}
*/
const notificationScenarios = [
"job-assigned-to-me",
"bill-posted",
"critical-parts-status-changed",
"part-marked-back-ordered",
"new-note-added",
"supplement-imported",
"schedule-dates-changed",
"tasks-updated-created",
"new-media-added-reassigned",
@@ -14,6 +17,7 @@ const notificationScenarios = [
"job-status-change",
"payment-collected-completed",
"alternate-transport-changed"
// "supplement-imported", // Disabled for now
];
export { notificationScenarios };

View File

@@ -5266,32 +5266,6 @@
- active:
_eq: true
check: null
event_triggers:
- name: notifications_parts_dispatch
definition:
enable_manual: false
insert:
columns: '*'
retry_conf:
interval_sec: 10
num_retries: 0
timeout_sec: 60
webhook_from_env: HASURA_API_URL
headers:
- name: event-secret
value_from_env: EVENT_SECRET
request_transform:
body:
action: transform
template: |-
{
"success": true
}
method: POST
query_params: {}
template_engine: Kriti
url: '{{$base_url}}/notifications/events/handlePartsDispatchChange'
version: 2
- table:
name: parts_dispatch_lines
schema: public

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS "public"."notificiations_idx_jobs";

View File

@@ -0,0 +1,2 @@
CREATE INDEX "notificiations_idx_jobs" on
"public"."notifications" using btree ("jobid");

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS "public"."notifications_idx_associations";

View File

@@ -0,0 +1,2 @@
CREATE INDEX "notifications_idx_associations" on
"public"."notifications" using btree ("associationid");

View File

@@ -0,0 +1,3 @@
-- Could not auto-generate a down migration.
-- Please write an appropriate down migration for the SQL below:
-- CREATE INDEX idx_notifications_created_at_not_read ON notifications(created_at desc, read) where read is null;

View File

@@ -0,0 +1 @@
CREATE INDEX idx_notifications_created_at_not_read ON notifications(created_at desc, read) where read is null;

View File

@@ -0,0 +1,3 @@
-- Could not auto-generate a down migration.
-- Please write an appropriate down migration for the SQL below:
-- CREATE INDEX idx_notifications_associations_not_read ON notifications(associationid, read) where read is null;

View File

@@ -0,0 +1 @@
CREATE INDEX idx_notifications_associations_not_read ON notifications(associationid, read) where read is null;

View File

@@ -22,7 +22,7 @@ const cookieParser = require("cookie-parser");
const { Server } = require("socket.io");
const { createAdapter } = require("@socket.io/redis-adapter");
const { instrument } = require("@socket.io/admin-ui");
const { isString, isEmpty } = require("lodash");
const { isString, isEmpty, isFunction } = require("lodash");
const logger = require("./server/utils/logger");
const { applyRedisHelpers } = require("./server/utils/redisHelpers");
@@ -393,7 +393,9 @@ const main = async () => {
const StatusReporter = StartStatusReporter();
registerCleanupTask(async () => {
StatusReporter.end();
if (isFunction(StatusReporter?.end)) {
StatusReporter.end();
}
});
try {

View File

@@ -20,6 +20,11 @@ const defaultFooter = () => {
const now = () => moment().format("MM/DD/YYYY @ hh:mm a");
/**
* Generate the email template
* @param strings
* @returns {string}
*/
const generateEmailTemplate = (strings) => {
return (
`

View File

@@ -134,24 +134,6 @@ const handleJobLinesChange = async (req, res) =>
const handleNotesChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Notes Changed Notification Event Handled.");
/**
* Handle parts dispatch change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsDispatchChange = (req, res) => res.status(200).json({ message: "Parts Dispatch change handled." });
/**
* Handle parts order change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsOrderChange = (req, res) => res.status(200).json({ message: "Parts Order change handled." });
/**
* Handle payments change notifications.
*
@@ -182,6 +164,27 @@ const handleTasksChange = async (req, res) =>
const handleTimeTicketsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Time Tickets Changed Notification Event Handled.");
/**
* Handle parts dispatch change notifications.
* Note: Placeholder
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*
*/
const handlePartsDispatchChange = (req, res) => res.status(200).json({ message: "Parts Dispatch change handled." });
/**
* Handle parts order change notifications.
* Note: Placeholder
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsOrderChange = (req, res) => res.status(200).json({ message: "Parts Order change handled." });
module.exports = {
handleJobsChange,
handleBillsChange,

View File

@@ -1,6 +1,8 @@
const { Queue, Worker } = require("bullmq");
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const graphQLClient = require("../../graphql-client/graphql-client").client;
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
@@ -45,17 +47,20 @@ const buildNotificationContent = (notifications) => {
*/
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
logger.logger.debug("Initializing Notifications Queues");
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`);
addQueue = new Queue("notificationsAdd", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
consolidateQueue = new Queue("notificationsConsolidate", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
@@ -63,9 +68,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
"notificationsAdd",
async (job) => {
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
logger.logger.debug(`Adding notifications for jobId ${jobId}`);
devDebugLogger(`Adding notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
for (const recipient of recipients) {
@@ -75,12 +80,12 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
notifications.push(notification);
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000);
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
const consolidateKey = `app:consolidate:${jobId}`;
const consolidateKey = `app:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
if (flagSet) {
await consolidateQueue.add(
@@ -93,15 +98,15 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
backoff: LOCK_EXPIRATION
}
);
logger.logger.debug(`Scheduled consolidation for jobId ${jobId}`);
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
} else {
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
@@ -110,23 +115,24 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
"notificationsConsolidate",
async (job) => {
const { jobId, recipients } = job.data;
logger.logger.debug(`Consolidating notifications for jobId ${jobId}`);
devDebugLogger(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const lockKey = `lock:${devKey}:consolidate:${jobId}`;
const redisKeyPrefix = `app:notifications:${jobId}`;
const lockKey = `lock:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
if (lockAcquired) {
try {
const allNotifications = {};
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
for (const user of uniqueUsers) {
const userKey = `${redisKeyPrefix}:${user}`;
const notifications = await pubClient.get(userKey);
logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`);
devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
@@ -136,13 +142,13 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
allNotifications[user][bodyShopId] = parsedNotifications;
}
await pubClient.del(userKey);
logger.logger.debug(`Deleted Redis key ${userKey}`);
devDebugLogger(`Deleted Redis key ${userKey}`);
} else {
logger.logger.debug(`No notifications found for ${user} under ${userKey}`);
devDebugLogger(`No notifications found for ${user} under ${userKey}`);
}
}
logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
// Insert notifications into the database and collect IDs
const notificationInserts = [];
@@ -169,7 +175,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
objects: notificationInserts
});
logger.logger.debug(
devDebugLogger(
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
);
@@ -203,16 +209,16 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
associationId
});
});
logger.logger.debug(
devDebugLogger(
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
);
} else {
logger.logger.debug(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
}
}
await pubClient.del(`app:consolidate:${jobId}`);
await pubClient.del(`app:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
@@ -223,19 +229,20 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
await pubClient.del(lockKey);
}
} else {
logger.logger.debug(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
addWorker.on("completed", (job) => logger.logger.debug(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => logger.logger.debug(`Consolidate job ${job.id} completed`));
addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`));
addWorker.on("failed", (job, err) =>
logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", {
message: err?.message,
@@ -251,9 +258,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
logger.logger.debug("Closing app queue workers...");
devDebugLogger("Closing app queue workers...");
await Promise.all([addWorker.close(), consolidateWorker.close()]);
logger.logger.debug("App queue workers closed");
devDebugLogger("App queue workers closed");
};
registerCleanupTask(shutdown);
@@ -283,7 +290,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.debug(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};

View File

@@ -3,6 +3,9 @@ const { sendTaskEmail } = require("../../email/sendemail");
const generateEmailTemplate = require("../../email/generateTemplate");
const { InstanceEndpoints } = require("../../utils/instanceMgr");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const devDebugLogger = require("../../utils/devDebugLogger");
const moment = require("moment-timezone");
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS;
@@ -34,19 +37,22 @@ let emailConsolidateWorker;
*/
const loadEmailQueue = async ({ pubClient, logger }) => {
if (!emailAddQueue || !emailConsolidateQueue) {
logger.logger.debug("Initializing Email Notification Queues");
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
devDebugLogger(`Initializing Email Notification Queues with prefix: ${prefix}`);
// Queue for adding email notifications
emailAddQueue = new Queue("emailAdd", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
// Queue for consolidating and sending emails
emailConsolidateQueue = new Queue("emailConsolidate", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
@@ -54,29 +60,31 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
emailAddWorker = new Worker(
"emailAdd",
async (job) => {
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
logger.logger.debug(`Adding email notifications for jobId ${jobId}`);
const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = job.data;
devDebugLogger(`Adding email notifications for jobId ${jobId}`);
const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`;
const redisKeyPrefix = `email:notifications:${jobId}`;
for (const recipient of recipients) {
const { user, firstName, lastName } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
await pubClient.rpush(userKey, body);
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
const detailsKey = `email:recipientDetails:${jobId}:${user}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`;
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone);
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.sadd(`email:recipients:${jobId}`, user);
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user);
devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`);
}
const consolidateKey = `email:consolidate:${jobId}`;
const consolidateKey = `email:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
if (flagSet) {
await emailConsolidateQueue.add(
"consolidate-emails",
{ jobId, jobRoNumber, bodyShopName },
{ jobId, jobRoNumber, bodyShopName, bodyShopTimezone },
{
jobId: `consolidate:${jobId}`,
delay: EMAIL_CONSOLIDATION_DELAY,
@@ -84,15 +92,15 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
backoff: LOCK_EXPIRATION
}
);
logger.logger.debug(`Scheduled email consolidation for jobId ${jobId}`);
devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
} else {
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`);
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
@@ -102,26 +110,28 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
"emailConsolidate",
async (job) => {
const { jobId, jobRoNumber, bodyShopName } = job.data;
logger.logger.debug(`Consolidating emails for jobId ${jobId}`);
devDebugLogger(`Consolidating emails for jobId ${jobId}`);
const lockKey = `lock:emailConsolidate:${jobId}`;
const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
if (lockAcquired) {
try {
const recipientsSet = `email:recipients:${jobId}`;
const recipientsSet = `email:${devKey}:recipients:${jobId}`;
const recipients = await pubClient.smembers(recipientsSet);
for (const recipient of recipients) {
const userKey = `email:notifications:${jobId}:${recipient}`;
const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
const userKey = `email:${devKey}:notifications:${jobId}:${recipient}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${recipient}`;
const messages = await pubClient.lrange(userKey, 0, -1);
if (messages.length > 0) {
const details = await pubClient.hgetall(detailsKey);
const firstName = details.firstName || "User";
const multipleUpdateString = messages.length > 1 ? "Updates" : "Update";
const subject = `${multipleUpdateString} for job ${jobRoNumber || "N/A"} at ${bodyShopName}`;
const timezone = moment.tz.zone(details?.bodyShopTimezone) ? details.bodyShopTimezone : "UTC";
const emailBody = generateEmailTemplate({
header: `${multipleUpdateString} for Job ${jobRoNumber || "N/A"}`,
subHeader: `Dear ${firstName},`,
dateLine: moment().tz(timezone).format("MM/DD/YYYY hh:mm a"),
body: `
<p>There have been updates to job ${jobRoNumber || "N/A"} at ${bodyShopName}:</p><br/>
<ul>
@@ -136,7 +146,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
type: "html",
html: emailBody
});
logger.logger.debug(
devDebugLogger(
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
);
await pubClient.del(userKey);
@@ -144,7 +154,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
}
}
await pubClient.del(recipientsSet);
await pubClient.del(`email:consolidate:${jobId}`);
await pubClient.del(`email:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
@@ -155,20 +165,21 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
await pubClient.del(lockKey);
}
} else {
logger.logger.debug(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
devDebugLogger(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}
);
// Event handlers for workers
emailAddWorker.on("completed", (job) => logger.logger.debug(`Email add job ${job.id} completed`));
emailConsolidateWorker.on("completed", (job) => logger.logger.debug(`Email consolidate job ${job.id} completed`));
emailAddWorker.on("completed", (job) => devDebugLogger(`Email add job ${job.id} completed`));
emailConsolidateWorker.on("completed", (job) => devDebugLogger(`Email consolidate job ${job.id} completed`));
emailAddWorker.on("failed", (job, err) =>
logger.log(`add-email-queue-failed`, "ERROR", "notifications", "api", {
message: err?.message,
@@ -184,9 +195,9 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
// Register cleanup task instead of direct process listeners
const shutdown = async () => {
logger.logger.debug("Closing email queue workers...");
devDebugLogger("Closing email queue workers...");
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
logger.logger.debug("Email queue workers closed");
devDebugLogger("Email queue workers closed");
};
registerCleanupTask(shutdown);
}
@@ -219,10 +230,10 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
const emailAddQueue = getQueue();
for (const email of emailsToDispatch) {
const { jobId, jobRoNumber, bodyShopName, body, recipients } = email;
const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = email;
if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) {
logger.logger.warn(
devDebugLogger(
`Skipping email dispatch for jobId ${jobId} due to missing data: ` +
`jobRoNumber=${jobRoNumber || "N/A"}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}`
);
@@ -231,10 +242,10 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
await emailAddQueue.add(
"add-email-notification",
{ jobId, jobRoNumber, bodyShopName, body, recipients },
{ jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.debug(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
devDebugLogger(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};

View File

@@ -6,12 +6,36 @@ const Dinero = require("dinero.js");
Dinero.globalRoundingMode = "HALF_EVEN";
/**
* Populates the recipients for app, email, and FCM notifications based on scenario watchers.
*
* @param {Object} data - The data object containing scenarioWatchers and bodyShopId.
* @param {Object} result - The result object to populate with recipients for app, email, and FCM notifications.
* Creates a standard notification object with app, email, and FCM properties and populates recipients.
* @param {Object} data - Input data containing jobId, jobRoNumber, bodyShopId, bodyShopName, and scenarioWatchers
* @param {string} key - Notification key for the app
* @param {string} body - Notification body text
* @param {Object} [variables={}] - Variables for the app notification
* @returns {Object} Notification object with populated recipients
*/
const populateWatchers = (data, result) => {
const buildNotification = (data, key, body, variables = {}) => {
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key,
body,
variables,
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
bodyShopTimezone: data.bodyShopTimezone,
body,
recipients: []
},
fcm: { recipients: [] }
};
// Populate recipients from scenarioWatchers
data.scenarioWatchers.forEach((recipients) => {
const { user, app, fcm, email, firstName, lastName, employeeId, associationId } = recipients;
if (app === true)
@@ -24,287 +48,153 @@ const populateWatchers = (data, result) => {
if (fcm === true) result.fcm.recipients.push(user);
if (email === true) result.email.recipients.push({ user, firstName, lastName });
});
};
/**
* Builds notification data for changes to alternate transport.
*/
const alternateTransportChangedBuilder = (data) => {
const body = `The alternate transportation has been changed from ${data.changedFields.alt_transport?.old || "unset"} to ${data?.changedFields?.alt_transport?.new || "unset"}.`;
const result = {
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
jobRoNumber: data.jobRoNumber,
key: "notifications.job.alternateTransportChanged",
body,
variables: {
alternateTransport: data?.changedFields?.alt_transport?.new,
oldAlternateTransport: data?.changedFields?.alt_transport?.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
};
/**
* Builds notification data for bill posted events.
* Creates a notification for when the alternate transport is changed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const billPostedHandler = (data) => {
const alternateTransportChangedBuilder = (data) => {
const oldTransport = data?.changedFields?.alt_transport?.old;
const newTransport = data?.changedFields?.alt_transport?.new;
let body;
if (oldTransport && newTransport)
body = `The alternate transportation has been changed from ${oldTransport} to ${newTransport}.`;
else if (!oldTransport && newTransport) body = `The alternate transportation has been set to ${newTransport}.`;
else if (oldTransport && !newTransport)
body = `The alternate transportation has been canceled (previously ${oldTransport}).`;
else body = `The alternate transportation has been updated.`;
return buildNotification(data, "notifications.job.alternateTransportChanged", body, {
alternateTransport: newTransport,
oldAlternateTransport: oldTransport
});
};
/**
* Creates a notification for when a bill is posted.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const billPostedBuilder = (data) => {
const facing = data?.data?.isinhouse ? "in-house" : "vendor";
const body = `An ${facing} ${data?.data?.is_credit_memo ? "credit memo" : "bill"} has been posted.`.trim();
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.billPosted",
body,
variables: {
isInHouse: data?.data?.isinhouse,
isCreditMemo: data?.data?.is_credit_memo
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.billPosted", body, {
isInHouse: data?.data?.isinhouse,
isCreditMemo: data?.data?.is_credit_memo
});
};
/**
* Builds notification data for changes to critical parts status.
* Creates a notification for when the status of critical parts changes.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
//
const criticalPartsStatusChangedBuilder = (data) => {
const body = `The status on a critical part line (${data?.data?.line_desc}) has changed to ${data?.data?.status || "unset"}.`;
const lineDesc = data?.data?.line_desc;
const status = data?.data?.status;
const body = status
? `The status on a critical part line (${lineDesc}) has been set to ${status}.`
: `The status on a critical part line (${lineDesc}) has been cleared.`;
const result = {
app: {
jobId: data.jobId,
bodyShopId: data.bodyShopId,
jobRoNumber: data.jobRoNumber,
key: "notifications.job.criticalPartsStatusChanged",
body,
variables: {
joblineId: data?.data?.id, // If we want to deeplink to the jobline
status: data?.data?.status,
line_desc: data?.data?.line_desc
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.criticalPartsStatusChanged", body, {
joblineId: data?.data?.id,
status: data?.data?.status,
line_desc: lineDesc
});
};
/**
* Builds notification data for completed intake or delivery checklists.
* Creates a notification for when the intake or delivery checklist is completed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const intakeDeliveryChecklistCompletedBuilder = (data) => {
const checklistType = data?.changedFields?.intakechecklist ? "intake" : "delivery";
const body = `The ${checklistType.charAt(0).toUpperCase() + checklistType.slice(1)} checklist has been completed.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.checklistCompleted",
body,
variables: {
checklistType,
completed: true
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.checklistCompleted", body, {
checklistType,
completed: true
});
};
/**
* Builds notification data for job assignment events.
* Creates a notification for when a job is assigned to the user.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const jobAssignedToMeBuilder = (data) => {
const body = `You have been assigned to ${getJobAssignmentType(data.scenarioFields?.[0])}.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.assigned",
body,
variables: {
type: data.scenarioFields?.[0]
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.assigned", body, {
type: data.scenarioFields?.[0]
});
};
/**
* Builds notification data for jobs added to production.
* Creates a notification for when jobs are added to production.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const jobsAddedToProductionBuilder = (data) => {
const body = `Job is now in production.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.addedToProduction",
body,
variables: {},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.addedToProduction", body);
};
/**
* Builds notification data for job status changes.
* Creates a notification for when the job status changes.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const jobStatusChangeBuilder = (data) => {
const body = `The status has changed from ${data?.changedFields?.status?.old || "unset"} to ${data?.changedFields?.status?.new || "unset"}`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.statusChanged",
body,
variables: {
status: data.changedFields.status.new,
oldStatus: data.changedFields.status.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
const oldStatus = data?.changedFields?.status?.old;
const newStatus = data?.changedFields?.status?.new;
let body;
populateWatchers(data, result);
return result;
if (oldStatus && newStatus) body = `The status has been changed from ${oldStatus} to ${newStatus}.`;
else if (!oldStatus && newStatus) body = `The status has been set to ${newStatus}.`;
else if (oldStatus && !newStatus) body = `The status has been cleared (previously ${oldStatus}).`;
else body = `The status has been updated.`;
return buildNotification(data, "notifications.job.statusChanged", body, {
status: newStatus,
oldStatus: oldStatus
});
};
/**
* Builds notification data for new media added or reassigned events.
* Creates a notification for when new media is added or reassigned.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const newMediaAddedReassignedBuilder = (data) => {
// Determine if it's an image or document
const mediaType = data?.data?.type?.startsWith("image") ? "Image" : "Document";
// Determine the action
let action;
if (data?.data?._documentMoved) {
action = "moved to another job"; // Special case for document moved from this job
} else if (data.isNew) {
action = "added"; // New media
} else if (data.changedFields?.jobid && data.changedFields.jobid.old !== data.changedFields.jobid.new) {
action = "moved to this job";
} else {
action = "updated";
}
// Construct the body string
const action = data?.data?._documentMoved
? "moved to another job"
: data.isNew
? "added"
: data.changedFields?.jobid && data.changedFields.jobid.old !== data.changedFields.jobid.new
? "moved to this job"
: "updated";
const body = `An ${mediaType} has been ${action}.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.newMediaAdded",
body,
variables: {
mediaType,
action,
movedToJob: data?.data?._movedToJob
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.newMediaAdded", body, {
mediaType,
action,
movedToJob: data?.data?._movedToJob
});
};
/**
* Builds notification data for new notes added to a job.
* Creates a notification for when a new note is added.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const newNoteAddedBuilder = (data) => {
const body = [
@@ -318,240 +208,119 @@ const newNoteAddedBuilder = (data) => {
.filter(Boolean)
.join(" ");
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.newNoteAdded",
body,
variables: {
createdBy: data?.data?.created_by,
critical: data?.data?.critical,
type: data?.data?.type,
private: data?.data?.private
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.newNoteAdded", body, {
createdBy: data?.data?.created_by,
critical: data?.data?.critical,
type: data?.data?.type,
private: data?.data?.private
});
};
/**
* Builds notification data for new time tickets posted.
* Creates a notification for when a new time ticket is posted.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const newTimeTicketPostedBuilder = (data) => {
const type = data?.data?.cost_center;
const body = `A ${startCase(type.toLowerCase())} time ticket for ${data?.data?.date} has been posted.`.trim();
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.newTimeTicketPosted",
body,
variables: {
type,
date: data?.data?.date
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.newTimeTicketPosted", body, {
type,
date: data?.data?.date
});
};
/**
* Builds notification data for parts marked as back-ordered.
* Creates a notification for when a part is marked as back-ordered.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const partMarkedBackOrderedBuilder = (data) => {
const body = `A part ${data?.data?.line_desc} has been marked as back-ordered.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.partBackOrdered",
body,
variables: {
line_desc: data?.data?.line_desc
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.partBackOrdered", body, {
line_desc: data?.data?.line_desc
});
};
/**
* Builds notification data for payment collection events.
* Creates a notification for when payment is collected or completed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const paymentCollectedCompletedBuilder = (data) => {
const momentFormat = "MM/DD/YYYY";
// Format amount using Dinero.js
const amountDinero = Dinero({
amount: Math.round((data.data.amount || 0) * 100) // Convert to cents, default to 0 if missing
});
const amountDinero = Dinero({ amount: Math.round((data.data.amount || 0) * 100) });
const amountFormatted = amountDinero.toFormat();
const payer = data.data.payer;
const paymentType = data.data.type;
const paymentDate = moment(data.data.date).format(momentFormat);
const body = `Payment of ${amountFormatted} has been collected from ${payer} via ${paymentType} on ${paymentDate}`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.paymentCollected",
body,
variables: {
amount: data.data.amount,
payer: data.data.payer,
type: data.data.type,
date: data.data.date
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.paymentCollected", body, {
amount: data.data.amount,
payer: data.data.payer,
type: data.data.type,
date: data.data.date
});
};
/**
* Builds notification data for changes to scheduled dates.
* Creates a notification for when scheduled dates are changed.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const scheduledDatesChangedBuilder = (data) => {
const changedFields = data.changedFields;
// Define field configurations
const fieldConfigs = {
scheduled_in: "Scheduled In",
scheduled_completion: "Scheduled Completion",
scheduled_delivery: "Scheduled Delivery"
};
// Helper function to format date and time with "at"
const formatDateTime = (date) => {
if (!date) return "unset";
if (!date) return "(no date set)";
const formatted = moment(date).tz(data.bodyShopTimezone);
const datePart = formatted.format("MM/DD/YYYY");
const timePart = formatted.format("hh:mm a");
return `${datePart} at ${timePart}`;
return `${formatted.format("MM/DD/YYYY")} at ${formatted.format("hh:mm a")}`;
};
// Build field messages dynamically
const fieldMessages = Object.entries(fieldConfigs)
.filter(([field]) => changedFields[field]) // Only include changed fields
.filter(([field]) => changedFields[field])
.map(([field, label]) => {
const { old, new: newValue } = changedFields[field];
// Case 1: Scheduled date cancelled (from value to null)
if (old && !newValue) {
return `${label} was cancelled (previously ${formatDateTime(old)}).`;
}
// Case 2: Scheduled date set (from null to value)
else if (!old && newValue) {
return `${label} was set to ${formatDateTime(newValue)}.`;
}
// Case 3: Scheduled date changed (from value to value)
else if (old && newValue) {
return `${label} changed from ${formatDateTime(old)} to ${formatDateTime(newValue)}.`;
}
return ""; // Fallback, though this shouldn't happen with the filter
if (old && !newValue) return `${label} was cancelled (previously ${formatDateTime(old)}).`;
else if (!old && newValue) return `${label} was set to ${formatDateTime(newValue)}.`;
else if (old && newValue) return `${label} changed from ${formatDateTime(old)} to ${formatDateTime(newValue)}.`;
return "";
})
.filter(Boolean); // Remove any empty strings
.filter(Boolean);
const body = fieldMessages.length > 0 ? fieldMessages.join(" ") : "Scheduled dates have been updated.";
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.scheduledDatesChanged",
body,
variables: {
scheduledIn: changedFields.scheduled_in?.new,
oldScheduledIn: changedFields.scheduled_in?.old,
scheduledCompletion: changedFields.scheduled_completion?.new,
oldScheduledCompletion: changedFields.scheduled_completion?.old,
scheduledDelivery: changedFields.scheduled_delivery?.new,
oldScheduledDelivery: changedFields.scheduled_delivery?.old
},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.scheduledDatesChanged", body, {
scheduledIn: changedFields.scheduled_in?.new,
oldScheduledIn: changedFields.scheduled_in?.old,
scheduledCompletion: changedFields.scheduled_completion?.new,
oldScheduledCompletion: changedFields.scheduled_completion?.old,
scheduledDelivery: changedFields.scheduled_delivery?.new,
oldScheduledDelivery: changedFields.scheduled_delivery?.old
});
};
/**
* Builds notification data for tasks updated or created.
* Creates a notification for when tasks are updated or created.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const tasksUpdatedCreatedBuilder = (data) => {
const momentFormat = "MM/DD/YYYY hh:mm a";
const timezone = data.bodyShopTimezone;
const taskTitle = data?.data?.title ? `"${data.data.title}"` : "Unnamed Task";
let body;
let variables;
let body, variables;
if (data.isNew) {
// Created case
const priority = formatTaskPriority(data?.data?.priority);
const createdBy = data?.data?.created_by || "Unknown"; // Fallback for undefined created_by
const createdBy = data?.data?.created_by || "Unknown";
const dueDate = data.data.due_date ? ` due on ${moment(data.data.due_date).tz(timezone).format(momentFormat)}` : "";
const completedOnCreation = data.data.completed === true;
body = `A ${priority} task ${taskTitle} has been created${completedOnCreation ? " and marked completed" : ""} by ${createdBy}${dueDate}.`;
@@ -562,119 +331,73 @@ const tasksUpdatedCreatedBuilder = (data) => {
priority: data?.data?.priority,
createdBy: data?.data?.created_by,
dueDate: data?.data?.due_date,
completed: completedOnCreation ? data?.data?.completed : undefined // Only include if true
completed: completedOnCreation ? data?.data?.completed : undefined
};
} else {
// Updated case
const changedFields = data.changedFields;
const fieldNames = Object.keys(changedFields);
const oldTitle = changedFields.title ? `"${changedFields.title.old || "Unnamed Task"}"` : taskTitle;
// Special case: Only 'completed' changed
if (fieldNames.length === 1 && changedFields.completed) {
body = `Task ${oldTitle} was marked ${changedFields.completed.new ? "complete" : "incomplete"}`;
variables = {
isNew: data.isNew,
roNumber: data.jobRoNumber,
title: data?.data?.title,
changedCompleted: data?.changedFields?.completed?.new
changedCompleted: changedFields.completed.new
};
} else {
// General update case
const fieldMessages = [];
if (changedFields.title) {
if (changedFields.title)
fieldMessages.push(`Task ${oldTitle} changed title to "${changedFields.title.new || "unnamed task"}".`);
}
if (changedFields.description) {
fieldMessages.push("Description updated.");
}
if (changedFields.priority) {
if (changedFields.description) fieldMessages.push("Description updated.");
if (changedFields.priority)
fieldMessages.push(`Priority changed to ${formatTaskPriority(changedFields.priority.new)}.`);
}
if (changedFields.due_date) {
if (changedFields.due_date)
fieldMessages.push(`Due date set to ${moment(changedFields.due_date.new).tz(timezone).format(momentFormat)}.`);
}
if (changedFields.completed) {
if (changedFields.completed)
fieldMessages.push(`Status changed to ${changedFields.completed.new ? "complete" : "incomplete"}.`);
}
body =
fieldMessages.length > 0
? fieldMessages.length === 1 && changedFields.title
? fieldMessages[0] // If only title changed, use it standalone
? fieldMessages[0]
: `Task ${oldTitle} updated: ${fieldMessages.join(", ")}`
: `Task ${oldTitle} has been updated.`;
variables = {
isNew: data.isNew,
roNumber: data.jobRoNumber,
title: data?.data?.title,
changedTitleOld: data?.changedFields?.title?.old,
changedTitleNew: data?.changedFields?.title?.new,
changedPriority: data?.changedFields?.priority?.new,
changedDueDate: data?.changedFields?.due_date?.new,
changedCompleted: data?.changedFields?.completed?.new
changedTitleOld: changedFields.title?.old,
changedTitleNew: changedFields.title?.new,
changedPriority: changedFields.priority?.new,
changedDueDate: changedFields.due_date?.new,
changedCompleted: changedFields.completed?.new
};
}
}
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: data.isNew ? "notifications.job.taskCreated" : "notifications.job.taskUpdated",
body,
variables,
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(
data,
data.isNew ? "notifications.job.taskCreated" : "notifications.job.taskUpdated",
body,
variables
);
};
/**
* Builds notification data for supplement imported events.
* TODO: This is an advanced case and will be done later
* Creates a notification for when a supplement is imported.
* @param data
* @returns {{app: {jobId, jobRoNumber: *, bodyShopId: *, key: string, body: string, variables: Object, recipients: *[]}, email: {jobId, jobRoNumber: *, bodyShopName: *, body: string, recipients: *[]}, fcm: {recipients: *[]}}}
*/
const supplementImportedBuilder = (data) => {
const body = `A supplement has been imported.`;
const result = {
app: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopId: data.bodyShopId,
key: "notifications.job.supplementImported",
body,
variables: {},
recipients: []
},
email: {
jobId: data.jobId,
jobRoNumber: data.jobRoNumber,
bodyShopName: data.bodyShopName,
body,
recipients: []
},
fcm: { recipients: [] }
};
populateWatchers(data, result);
return result;
return buildNotification(data, "notifications.job.supplementImported", body);
};
module.exports = {
alternateTransportChangedBuilder,
billPostedHandler,
billPostedBuilder,
criticalPartsStatusChangedBuilder,
intakeDeliveryChecklistCompletedBuilder,
jobAssignedToMeBuilder,

View File

@@ -1,6 +1,6 @@
const {
jobAssignedToMeBuilder,
billPostedHandler,
billPostedBuilder,
newNoteAddedBuilder,
scheduledDatesChangedBuilder,
tasksUpdatedCreatedBuilder,
@@ -30,10 +30,12 @@ const { isFunction } = require("lodash");
* - builder {Function}: A function to handle the scenario.
* - onlyTruthyValues {boolean|Array<string>}: Specifies fields that must have truthy values for the scenario to match.
* - filterCallback {Function}: Optional callback (sync or async) to further filter the scenario based on event data (returns boolean).
* - enabled {boolean}: If true, the scenario is active; if false or omitted, the scenario is skipped.
*/
const notificationScenarios = [
{
key: "job-assigned-to-me",
enabled: true,
table: "jobs",
fields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
matchToUserFields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
@@ -41,24 +43,28 @@ const notificationScenarios = [
},
{
key: "bill-posted",
enabled: true,
table: "bills",
builder: billPostedHandler,
builder: billPostedBuilder,
onNew: true
},
{
key: "new-note-added",
enabled: true,
table: "notes",
builder: newNoteAddedBuilder,
onNew: true
},
{
key: "schedule-dates-changed",
enabled: true,
table: "jobs",
fields: ["scheduled_in", "scheduled_completion", "scheduled_delivery"],
builder: scheduledDatesChangedBuilder
},
{
key: "tasks-updated-created",
enabled: true,
table: "tasks",
fields: ["updated_at"],
// onNew: true,
@@ -66,12 +72,14 @@ const notificationScenarios = [
},
{
key: "job-status-change",
enabled: true,
table: "jobs",
fields: ["status"],
builder: jobStatusChangeBuilder
},
{
key: "job-added-to-production",
enabled: true,
table: "jobs",
fields: ["inproduction"],
onlyTruthyValues: ["inproduction"],
@@ -79,36 +87,42 @@ const notificationScenarios = [
},
{
key: "alternate-transport-changed",
enabled: true,
table: "jobs",
fields: ["alt_transport"],
builder: alternateTransportChangedBuilder
},
{
key: "new-time-ticket-posted",
enabled: true,
table: "timetickets",
builder: newTimeTicketPostedBuilder
},
{
key: "intake-delivery-checklist-completed",
enabled: true,
table: "jobs",
fields: ["intakechecklist", "deliverchecklist"],
builder: intakeDeliveryChecklistCompletedBuilder
},
{
key: "payment-collected-completed",
enabled: true,
table: "payments",
onNew: true,
builder: paymentCollectedCompletedBuilder
},
{
// MAKE SURE YOU ARE NOT ON A LMS ENVIRONMENT
// Only works on a non LMS ENV
key: "new-media-added-reassigned",
enabled: true,
table: "documents",
fields: ["jobid"],
builder: newMediaAddedReassignedBuilder
},
{
key: "critical-parts-status-changed",
enabled: true,
table: "joblines",
fields: ["status"],
onlyTruthyValues: ["status"],
@@ -117,6 +131,7 @@ const notificationScenarios = [
},
{
key: "part-marked-back-ordered",
enabled: true,
table: "joblines",
fields: ["status"],
builder: partMarkedBackOrderedBuilder,
@@ -133,12 +148,11 @@ const notificationScenarios = [
}
}
},
// -------------- Difficult ---------------
// Holding off on this one for now
// Holding off on this one for now, spans multiple tables
{
key: "supplement-imported",
enabled: false,
builder: supplementImportedBuilder
// spans multiple tables,
}
];
@@ -159,6 +173,11 @@ const notificationScenarios = [
const getMatchingScenarios = async (eventData, getBodyshopFromRedis) => {
const matches = [];
for (const scenario of notificationScenarios) {
// Check if the scenario is enabled; skip if not explicitly true
if (scenario.enabled !== true) {
continue;
}
// If eventData has a table, then only scenarios with a table property that matches should be considered.
if (eventData.table) {
if (!scenario.table || eventData.table.name !== scenario.table) {

View File

@@ -35,7 +35,6 @@ const scenarioParser = async (req, jobIdField) => {
} = req;
// Step 1: Validate we know what user committed the action that fired the parser
// console.log("Step 1");
const hasuraUserRole = event?.session_variables?.["x-hasura-role"];
const hasuraUserId = event?.session_variables?.["x-hasura-user-id"];
@@ -52,7 +51,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 2: Extract just the jobId using the provided jobIdField
// console.log("Step 2");
let jobId = null;
if (jobIdField) {
@@ -70,7 +68,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 3: Query job watchers associated with the job ID using GraphQL
// console.log("Step 3");
const watcherData = await gqlClient.request(queries.GET_JOB_WATCHERS, {
jobid: jobId
@@ -96,7 +93,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 5: Perform the full event diff now that we know there are watchers
// console.log("Step 5");
const eventData = await eventParser({
newData: event.data.new,
@@ -107,7 +103,6 @@ const scenarioParser = async (req, jobIdField) => {
});
// Step 6: Extract body shop information from the job data
// console.log("Step 6");
const bodyShopId = watcherData?.job?.bodyshop?.id;
const bodyShopName = watcherData?.job?.bodyshop?.shopname;
@@ -122,7 +117,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 7: Identify scenarios that match the event data and job context
// console.log("Step 7");
const matchingScenarios = await getMatchingScenarios(
{
@@ -155,7 +149,6 @@ const scenarioParser = async (req, jobIdField) => {
};
// Step 8: Query notification settings for the job watchers
// console.log("Step 8");
const associationsData = await gqlClient.request(queries.GET_NOTIFICATION_ASSOCIATIONS, {
emails: jobWatchers.map((x) => x.email),
@@ -173,7 +166,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 9: Filter scenario watchers based on their enabled notification methods
// console.log("Step 9");
finalScenarioData.matchingScenarios = finalScenarioData.matchingScenarios.map((scenario) => ({
...scenario,
@@ -213,7 +205,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 10: Build and collect scenarios to dispatch notifications for
// console.log("Step 10");
const scenariosToDispatch = [];
@@ -240,7 +231,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 11: Filter scenario fields to include only those that changed
// console.log("Step 11");
const filteredScenarioFields =
scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || [];
@@ -274,7 +264,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 12: Dispatch email notifications to the email queue
// console.log("Step 12");
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email);
if (!isEmpty(emailsToDispatch)) {
@@ -287,7 +276,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 13: Dispatch app notifications to the app queue
// console.log("Step 13");
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
if (!isEmpty(appsToDispatch)) {

View File

@@ -0,0 +1,10 @@
const logger = require("./logger");
const devDebugLogger = (message, meta) => {
if (process.env?.NODE_ENV === "production") {
return;
}
logger.logger.debug(message, meta);
};
module.exports = devDebugLogger;

View File

@@ -0,0 +1,3 @@
const getBullMQPrefix = () => (process.env?.NODE_ENV === "production" ? "{PROD-BULLMQ}" : "{DEV-BULLMQ}");
module.exports = getBullMQPrefix;

View File

@@ -1,4 +1,5 @@
const { GET_BODYSHOP_BY_ID } = require("../graphql-client/queries");
const devDebugLogger = require("./devDebugLogger");
const client = require("../graphql-client/graphql-client").client;
const BODYSHOP_CACHE_TTL = 3600; // 1 hour
@@ -10,6 +11,14 @@ const BODYSHOP_CACHE_TTL = 3600; // 1 hour
*/
const getBodyshopCacheKey = (bodyshopId) => `bodyshop-cache:${bodyshopId}`;
/**
* Generate a cache key for a user socket mapping
* @param email
* @returns {`user:${string}:${string}:socketMapping`}
*/
const getUserSocketMappingKey = (email) =>
`user:${process.env?.NODE_ENV === "production" ? "prod" : "dev"}:${email}:socketMapping`;
/**
* Fetch bodyshop data from the database
* @param bodyshopId
@@ -69,112 +78,17 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Store multiple session data in Redis
const setMultipleSessionData = async (socketId, keyValues) => {
try {
// keyValues is expected to be an object { key1: value1, key2: value2, ... }
const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
await pubClient.hset(`socket:${socketId}`, ...entries.flat());
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Retrieve multiple session data from Redis
const getMultipleSessionData = async (socketId, keys) => {
try {
const data = await pubClient.hmget(`socket:${socketId}`, keys);
// Redis returns an object with null values for missing keys, so we parse the non-null ones
return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
} catch (error) {
logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
try {
// Use Redis multi/pipeline to batch the commands
const multi = pubClient.multi();
keyValueArray.forEach(([key, value]) => {
multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
});
await multi.exec(); // Execute all queued commands
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Helper function to add an item to the end of the Redis list
const addItemToEndOfList = async (socketId, key, newItem) => {
try {
await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
let userEmail = "unknown";
let socketMappings = {};
try {
const userData = await getSessionData(socketId, "user");
if (userData && userData.email) {
userEmail = userData.email;
socketMappings = await getUserSocketMapping(userEmail);
}
} catch (sessionError) {
logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
}
const mappingString = JSON.stringify(socketMappings, null, 2);
const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
logger.log(errorMessage, "ERROR", "redis");
}
};
// Helper function to add an item to the beginning of the Redis list
const addItemToBeginningOfList = async (socketId, key, newItem) => {
try {
await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Helper function to clear a list in Redis
const clearList = async (socketId, key) => {
try {
await pubClient.del(`socket:${socketId}:${key}`);
} catch (error) {
logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Add methods to manage room users
const addUserToRoom = async (room, user) => {
try {
await pubClient.sadd(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
}
};
const removeUserFromRoom = async (room, user) => {
try {
await pubClient.srem(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
}
};
const getUsersInRoom = async (room) => {
try {
const users = await pubClient.smembers(room);
return users.map((user) => JSON.parse(user));
} catch (error) {
logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
}
};
/**
* Add a socket mapping for a user
* @param email
* @param socketId
* @param bodyshopId
* @returns {Promise<void>}
*/
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
devDebugLogger(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`);
// Save the mapping: socketId -> bodyshopId
await pubClient.hset(socketMappingKey, socketId, bodyshopId);
// Set TTL (24 hours) for the mapping hash
@@ -184,38 +98,45 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Refresh the TTL for a user's socket mapping
* @param email
* @returns {Promise<void>}
*/
const refreshUserSocketTTL = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
const exists = await pubClient.exists(socketMappingKey);
if (exists) {
await pubClient.expire(socketMappingKey, 86400);
logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis");
devDebugLogger(`Refreshed TTL for ${email} socket mapping`);
}
} catch (error) {
logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis");
}
};
/**
* Remove a socket mapping for a user
* @param email
* @param socketId
* @returns {Promise<void>}
*/
const removeUserSocketMapping = async (email, socketId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis");
devDebugLogger(`Removing socket ${socketId} mapping for user ${email}`);
// Look up the bodyshopId associated with this socket
const bodyshopId = await pubClient.hget(socketMappingKey, socketId);
if (!bodyshopId) {
logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis");
devDebugLogger(`Socket ${socketId} not found for user ${email}`);
return;
}
// Remove the socket mapping
await pubClient.hdel(socketMappingKey, socketId);
logger.log(
`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`,
"DEBUG",
"redis"
);
devDebugLogger(`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`);
// Refresh TTL if any socket mappings remain
const remainingSockets = await pubClient.hlen(socketMappingKey);
@@ -227,9 +148,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Get all socket mappings for a user
* @param email
* @returns {Promise<{}>}
*/
const getUserSocketMapping = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
@@ -249,9 +175,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Get socket IDs for a user by bodyshopId
* @param email
* @param bodyshopId
* @returns {Promise<{socketIds: [string, string], ttl: *}>}
*/
const getUserSocketMappingByBodyshop = async (email, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
@@ -270,7 +202,11 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Get bodyshop data from Redis or fetch from DB if missing
/**
* Get bodyshop data from Redis
* @param bodyshopId
* @returns {Promise<*>}
*/
const getBodyshopFromRedis = async (bodyshopId) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
@@ -288,7 +224,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
await pubClient.set(key, jsonData);
await pubClient.expire(key, BODYSHOP_CACHE_TTL);
logger.log("bodyshop-cache-miss", "DEBUG", "redis", null, {
devDebugLogger("bodyshop-cache-miss", {
bodyshopId,
action: "Fetched from DB and cached"
});
@@ -303,14 +239,19 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Update or invalidate bodyshop data in Redis
/**
* Update or invalidate bodyshop data in Redis
* @param bodyshopId
* @param values
* @returns {Promise<void>}
*/
const updateOrInvalidateBodyshopFromRedis = async (bodyshopId, values = null) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
if (!values) {
// Invalidate cache by deleting the key
await pubClient.del(key);
logger.log("bodyshop-cache-invalidate", "DEBUG", "api", "redis", {
devDebugLogger("bodyshop-cache-invalidate", {
bodyshopId,
action: "Cache invalidated"
});
@@ -319,7 +260,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
const jsonData = JSON.stringify(values);
await pubClient.set(key, jsonData);
await pubClient.expire(key, BODYSHOP_CACHE_TTL);
logger.log("bodyshop-cache-update", "DEBUG", "api", "redis", {
devDebugLogger("bodyshop-cache-update", {
bodyshopId,
action: "Cache updated",
values
@@ -335,19 +276,118 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// NOTE: The following code was written for an abandoned branch and things have changes since the,
// Leaving it here for demonstration purposes, commenting it out so it does not get used
// Store multiple session data in Redis
// const setMultipleSessionData = async (socketId, keyValues) => {
// try {
// // keyValues is expected to be an object { key1: value1, key2: value2, ... }
// const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
// await pubClient.hset(`socket:${socketId}`, ...entries.flat());
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Retrieve multiple session data from Redis
// const getMultipleSessionData = async (socketId, keys) => {
// try {
// const data = await pubClient.hmget(`socket:${socketId}`, keys);
// // Redis returns an object with null values for missing keys, so we parse the non-null ones
// return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
// } catch (error) {
// logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
// try {
// // Use Redis multi/pipeline to batch the commands
// const multi = pubClient.multi();
// keyValueArray.forEach(([key, value]) => {
// multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
// });
// await multi.exec(); // Execute all queued commands
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to add an item to the end of the Redis list
// const addItemToEndOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// let userEmail = "unknown";
// let socketMappings = {};
// try {
// const userData = await getSessionData(socketId, "user");
// if (userData && userData.email) {
// userEmail = userData.email;
// socketMappings = await getUserSocketMapping(userEmail);
// }
// } catch (sessionError) {
// logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
// }
// const mappingString = JSON.stringify(socketMappings, null, 2);
// const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
// logger.log(errorMessage, "ERROR", "redis");
// }
// };
// Helper function to add an item to the beginning of the Redis list
// const addItemToBeginningOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to clear a list in Redis
// const clearList = async (socketId, key) => {
// try {
// await pubClient.del(`socket:${socketId}:${key}`);
// } catch (error) {
// logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Add methods to manage room users
// const addUserToRoom = async (room, user) => {
// try {
// await pubClient.sadd(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
// Remove users from room
// const removeUserFromRoom = async (room, user) => {
// try {
// await pubClient.srem(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
// Get Users in room
// const getUsersInRoom = async (room) => {
// try {
// const users = await pubClient.smembers(room);
// return users.map((user) => JSON.parse(user));
// } catch (error) {
// logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
// }
// };
const api = {
getUserSocketMappingKey,
getBodyshopCacheKey,
setSessionData,
getSessionData,
clearSessionData,
setMultipleSessionData,
getMultipleSessionData,
setMultipleFromArraySessionData,
addItemToEndOfList,
addItemToBeginningOfList,
clearList,
addUserToRoom,
removeUserFromRoom,
getUsersInRoom,
addUserSocketMapping,
removeUserSocketMapping,
getUserSocketMappingByBodyshop,
@@ -355,6 +395,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
refreshUserSocketTTL,
getBodyshopFromRedis,
updateOrInvalidateBodyshopFromRedis
// setMultipleSessionData,
// getMultipleSessionData,
// setMultipleFromArraySessionData,
// addItemToEndOfList,
// addItemToBeginningOfList,
// clearList,
// addUserToRoom,
// removeUserFromRoom,
// getUsersInRoom,
};
Object.assign(module.exports, api);