Compare commits

...

6 Commits

10 changed files with 622 additions and 847 deletions

View File

@@ -1,10 +1,13 @@
/** Notification Scenarios
* @description This file contains the scenarios for job notifications.
* @type {string[]}
*/
const notificationScenarios = [
"job-assigned-to-me",
"bill-posted",
"critical-parts-status-changed",
"part-marked-back-ordered",
"new-note-added",
"supplement-imported",
"schedule-dates-changed",
"tasks-updated-created",
"new-media-added-reassigned",
@@ -14,6 +17,7 @@ const notificationScenarios = [
"job-status-change",
"payment-collected-completed",
"alternate-transport-changed"
// "supplement-imported", // Disabled for now
];
export { notificationScenarios };

View File

@@ -5266,32 +5266,6 @@
- active:
_eq: true
check: null
event_triggers:
- name: notifications_parts_dispatch
definition:
enable_manual: false
insert:
columns: '*'
retry_conf:
interval_sec: 10
num_retries: 0
timeout_sec: 60
webhook_from_env: HASURA_API_URL
headers:
- name: event-secret
value_from_env: EVENT_SECRET
request_transform:
body:
action: transform
template: |-
{
"success": true
}
method: POST
query_params: {}
template_engine: Kriti
url: '{{$base_url}}/notifications/events/handlePartsDispatchChange'
version: 2
- table:
name: parts_dispatch_lines
schema: public

View File

@@ -134,24 +134,6 @@ const handleJobLinesChange = async (req, res) =>
const handleNotesChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Notes Changed Notification Event Handled.");
/**
* Handle parts dispatch change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsDispatchChange = (req, res) => res.status(200).json({ message: "Parts Dispatch change handled." });
/**
* Handle parts order change notifications.
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsOrderChange = (req, res) => res.status(200).json({ message: "Parts Order change handled." });
/**
* Handle payments change notifications.
*
@@ -182,6 +164,27 @@ const handleTasksChange = async (req, res) =>
const handleTimeTicketsChange = async (req, res) =>
processNotificationEvent(req, res, "req.body.event.new.jobid", "Time Tickets Changed Notification Event Handled.");
/**
* Handle parts dispatch change notifications.
* Note: Placeholder
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*
*/
const handlePartsDispatchChange = (req, res) => res.status(200).json({ message: "Parts Dispatch change handled." });
/**
* Handle parts order change notifications.
* Note: Placeholder
*
* @param {Object} req - Express request object.
* @param {Object} res - Express response object.
* @returns {Object} JSON response with a success message.
*/
const handlePartsOrderChange = (req, res) => res.status(200).json({ message: "Parts Order change handled." });
module.exports = {
handleJobsChange,
handleBillsChange,

View File

@@ -1,6 +1,7 @@
const { Queue, Worker } = require("bullmq");
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const graphQLClient = require("../../graphql-client/graphql-client").client;
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
@@ -45,17 +46,20 @@ const buildNotificationContent = (notifications) => {
*/
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
logger.logger.debug("Initializing Notifications Queues");
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`);
addQueue = new Queue("notificationsAdd", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
consolidateQueue = new Queue("notificationsConsolidate", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
@@ -65,7 +69,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
logger.logger.debug(`Adding notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
for (const recipient of recipients) {
@@ -78,7 +82,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
const consolidateKey = `app:consolidate:${jobId}`;
const consolidateKey = `app:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
@@ -100,8 +104,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
@@ -112,8 +116,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
const { jobId, recipients } = job.data;
logger.logger.debug(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const lockKey = `lock:consolidate:${jobId}`;
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const lockKey = `lock:${devKey}:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
@@ -212,7 +217,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
}
}
await pubClient.del(`app:consolidate:${jobId}`);
await pubClient.del(`app:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
@@ -227,8 +232,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}

View File

@@ -3,6 +3,7 @@ const { sendTaskEmail } = require("../../email/sendemail");
const generateEmailTemplate = require("../../email/generateTemplate");
const { InstanceEndpoints } = require("../../utils/instanceMgr");
const { registerCleanupTask } = require("../../utils/cleanupManager");
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS;
@@ -34,19 +35,22 @@ let emailConsolidateWorker;
*/
const loadEmailQueue = async ({ pubClient, logger }) => {
if (!emailAddQueue || !emailConsolidateQueue) {
logger.logger.debug("Initializing Email Notification Queues");
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`);
// Queue for adding email notifications
emailAddQueue = new Queue("emailAdd", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
// Queue for consolidating and sending emails
emailConsolidateQueue = new Queue("emailConsolidate", {
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
@@ -57,21 +61,22 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
logger.logger.debug(`Adding email notifications for jobId ${jobId}`);
const redisKeyPrefix = `email:notifications:${jobId}`;
const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`;
for (const recipient of recipients) {
const { user, firstName, lastName } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
await pubClient.rpush(userKey, body);
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
const detailsKey = `email:recipientDetails:${jobId}:${user}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`;
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.sadd(`email:recipients:${jobId}`, user);
await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user);
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
}
const consolidateKey = `email:consolidate:${jobId}`;
const consolidateKey = `email:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
if (flagSet) {
await emailConsolidateQueue.add(
@@ -91,8 +96,8 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
@@ -104,15 +109,15 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
const { jobId, jobRoNumber, bodyShopName } = job.data;
logger.logger.debug(`Consolidating emails for jobId ${jobId}`);
const lockKey = `lock:emailConsolidate:${jobId}`;
const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
if (lockAcquired) {
try {
const recipientsSet = `email:recipients:${jobId}`;
const recipientsSet = `email:${devKey}:recipients:${jobId}`;
const recipients = await pubClient.smembers(recipientsSet);
for (const recipient of recipients) {
const userKey = `email:notifications:${jobId}:${recipient}`;
const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
const userKey = `email:${devKey}:notifications:${jobId}:${recipient}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${recipient}`;
const messages = await pubClient.lrange(userKey, 0, -1);
if (messages.length > 0) {
const details = await pubClient.hgetall(detailsKey);
@@ -144,7 +149,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
}
}
await pubClient.del(recipientsSet);
await pubClient.del(`email:consolidate:${jobId}`);
await pubClient.del(`email:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,
@@ -159,8 +164,8 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
}
},
{
prefix,
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
const {
jobAssignedToMeBuilder,
billPostedHandler,
billPostedBuilder,
newNoteAddedBuilder,
scheduledDatesChangedBuilder,
tasksUpdatedCreatedBuilder,
@@ -30,10 +30,12 @@ const { isFunction } = require("lodash");
* - builder {Function}: A function to handle the scenario.
* - onlyTruthyValues {boolean|Array<string>}: Specifies fields that must have truthy values for the scenario to match.
* - filterCallback {Function}: Optional callback (sync or async) to further filter the scenario based on event data (returns boolean).
* - enabled {boolean}: If true, the scenario is active; if false or omitted, the scenario is skipped.
*/
const notificationScenarios = [
{
key: "job-assigned-to-me",
enabled: true,
table: "jobs",
fields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
matchToUserFields: ["employee_prep", "employee_body", "employee_csr", "employee_refinish"],
@@ -41,24 +43,28 @@ const notificationScenarios = [
},
{
key: "bill-posted",
enabled: true,
table: "bills",
builder: billPostedHandler,
builder: billPostedBuilder,
onNew: true
},
{
key: "new-note-added",
enabled: true,
table: "notes",
builder: newNoteAddedBuilder,
onNew: true
},
{
key: "schedule-dates-changed",
enabled: true,
table: "jobs",
fields: ["scheduled_in", "scheduled_completion", "scheduled_delivery"],
builder: scheduledDatesChangedBuilder
},
{
key: "tasks-updated-created",
enabled: true,
table: "tasks",
fields: ["updated_at"],
// onNew: true,
@@ -66,12 +72,14 @@ const notificationScenarios = [
},
{
key: "job-status-change",
enabled: true,
table: "jobs",
fields: ["status"],
builder: jobStatusChangeBuilder
},
{
key: "job-added-to-production",
enabled: true,
table: "jobs",
fields: ["inproduction"],
onlyTruthyValues: ["inproduction"],
@@ -79,36 +87,42 @@ const notificationScenarios = [
},
{
key: "alternate-transport-changed",
enabled: true,
table: "jobs",
fields: ["alt_transport"],
builder: alternateTransportChangedBuilder
},
{
key: "new-time-ticket-posted",
enabled: true,
table: "timetickets",
builder: newTimeTicketPostedBuilder
},
{
key: "intake-delivery-checklist-completed",
enabled: true,
table: "jobs",
fields: ["intakechecklist", "deliverchecklist"],
builder: intakeDeliveryChecklistCompletedBuilder
},
{
key: "payment-collected-completed",
enabled: true,
table: "payments",
onNew: true,
builder: paymentCollectedCompletedBuilder
},
{
// MAKE SURE YOU ARE NOT ON A LMS ENVIRONMENT
// Only works on a non LMS ENV
key: "new-media-added-reassigned",
enabled: true,
table: "documents",
fields: ["jobid"],
builder: newMediaAddedReassignedBuilder
},
{
key: "critical-parts-status-changed",
enabled: true,
table: "joblines",
fields: ["status"],
onlyTruthyValues: ["status"],
@@ -117,6 +131,7 @@ const notificationScenarios = [
},
{
key: "part-marked-back-ordered",
enabled: true,
table: "joblines",
fields: ["status"],
builder: partMarkedBackOrderedBuilder,
@@ -133,12 +148,11 @@ const notificationScenarios = [
}
}
},
// -------------- Difficult ---------------
// Holding off on this one for now
// Holding off on this one for now, spans multiple tables
{
key: "supplement-imported",
enabled: false,
builder: supplementImportedBuilder
// spans multiple tables,
}
];
@@ -159,6 +173,11 @@ const notificationScenarios = [
const getMatchingScenarios = async (eventData, getBodyshopFromRedis) => {
const matches = [];
for (const scenario of notificationScenarios) {
// Check if the scenario is enabled; skip if not explicitly true
if (scenario.enabled !== true) {
continue;
}
// If eventData has a table, then only scenarios with a table property that matches should be considered.
if (eventData.table) {
if (!scenario.table || eventData.table.name !== scenario.table) {

View File

@@ -35,7 +35,6 @@ const scenarioParser = async (req, jobIdField) => {
} = req;
// Step 1: Validate we know what user committed the action that fired the parser
// console.log("Step 1");
const hasuraUserRole = event?.session_variables?.["x-hasura-role"];
const hasuraUserId = event?.session_variables?.["x-hasura-user-id"];
@@ -52,7 +51,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 2: Extract just the jobId using the provided jobIdField
// console.log("Step 2");
let jobId = null;
if (jobIdField) {
@@ -70,7 +68,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 3: Query job watchers associated with the job ID using GraphQL
// console.log("Step 3");
const watcherData = await gqlClient.request(queries.GET_JOB_WATCHERS, {
jobid: jobId
@@ -96,7 +93,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 5: Perform the full event diff now that we know there are watchers
// console.log("Step 5");
const eventData = await eventParser({
newData: event.data.new,
@@ -107,7 +103,6 @@ const scenarioParser = async (req, jobIdField) => {
});
// Step 6: Extract body shop information from the job data
// console.log("Step 6");
const bodyShopId = watcherData?.job?.bodyshop?.id;
const bodyShopName = watcherData?.job?.bodyshop?.shopname;
@@ -122,7 +117,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 7: Identify scenarios that match the event data and job context
// console.log("Step 7");
const matchingScenarios = await getMatchingScenarios(
{
@@ -155,7 +149,6 @@ const scenarioParser = async (req, jobIdField) => {
};
// Step 8: Query notification settings for the job watchers
// console.log("Step 8");
const associationsData = await gqlClient.request(queries.GET_NOTIFICATION_ASSOCIATIONS, {
emails: jobWatchers.map((x) => x.email),
@@ -173,7 +166,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 9: Filter scenario watchers based on their enabled notification methods
// console.log("Step 9");
finalScenarioData.matchingScenarios = finalScenarioData.matchingScenarios.map((scenario) => ({
...scenario,
@@ -213,7 +205,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 10: Build and collect scenarios to dispatch notifications for
// console.log("Step 10");
const scenariosToDispatch = [];
@@ -240,7 +231,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 11: Filter scenario fields to include only those that changed
// console.log("Step 11");
const filteredScenarioFields =
scenario.fields?.filter((field) => eventData.changedFieldNames.includes(field)) || [];
@@ -274,7 +264,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 12: Dispatch email notifications to the email queue
// console.log("Step 12");
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email);
if (!isEmpty(emailsToDispatch)) {
@@ -287,7 +276,6 @@ const scenarioParser = async (req, jobIdField) => {
}
// Step 13: Dispatch app notifications to the app queue
// console.log("Step 13");
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
if (!isEmpty(appsToDispatch)) {

View File

@@ -0,0 +1,3 @@
const getBullMQPrefix = () => (process.env?.NODE_ENV === "production" ? "{PROD-BULLMQ}" : "{DEV-BULLMQ}");
module.exports = getBullMQPrefix;

View File

@@ -10,6 +10,14 @@ const BODYSHOP_CACHE_TTL = 3600; // 1 hour
*/
const getBodyshopCacheKey = (bodyshopId) => `bodyshop-cache:${bodyshopId}`;
/**
* Generate a cache key for a user socket mapping
* @param email
* @returns {`user:${string}:${string}:socketMapping`}
*/
const getUserSocketMappingKey = (email) =>
`user:${process.env?.NODE_ENV === "production" ? "prod" : "dev"}:${email}:socketMapping`;
/**
* Fetch bodyshop data from the database
* @param bodyshopId
@@ -69,110 +77,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Store multiple session data in Redis
const setMultipleSessionData = async (socketId, keyValues) => {
try {
// keyValues is expected to be an object { key1: value1, key2: value2, ... }
const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
await pubClient.hset(`socket:${socketId}`, ...entries.flat());
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Retrieve multiple session data from Redis
const getMultipleSessionData = async (socketId, keys) => {
try {
const data = await pubClient.hmget(`socket:${socketId}`, keys);
// Redis returns an object with null values for missing keys, so we parse the non-null ones
return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
} catch (error) {
logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
try {
// Use Redis multi/pipeline to batch the commands
const multi = pubClient.multi();
keyValueArray.forEach(([key, value]) => {
multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
});
await multi.exec(); // Execute all queued commands
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Helper function to add an item to the end of the Redis list
const addItemToEndOfList = async (socketId, key, newItem) => {
try {
await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
let userEmail = "unknown";
let socketMappings = {};
try {
const userData = await getSessionData(socketId, "user");
if (userData && userData.email) {
userEmail = userData.email;
socketMappings = await getUserSocketMapping(userEmail);
}
} catch (sessionError) {
logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
}
const mappingString = JSON.stringify(socketMappings, null, 2);
const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
logger.log(errorMessage, "ERROR", "redis");
}
};
// Helper function to add an item to the beginning of the Redis list
const addItemToBeginningOfList = async (socketId, key, newItem) => {
try {
await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Helper function to clear a list in Redis
const clearList = async (socketId, key) => {
try {
await pubClient.del(`socket:${socketId}:${key}`);
} catch (error) {
logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Add methods to manage room users
const addUserToRoom = async (room, user) => {
try {
await pubClient.sadd(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
}
};
const removeUserFromRoom = async (room, user) => {
try {
await pubClient.srem(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
}
};
const getUsersInRoom = async (room) => {
try {
const users = await pubClient.smembers(room);
return users.map((user) => JSON.parse(user));
} catch (error) {
logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
}
};
/**
* Add a socket mapping for a user
* @param email
* @param socketId
* @param bodyshopId
* @returns {Promise<void>}
*/
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
// Save the mapping: socketId -> bodyshopId
@@ -184,9 +97,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Refresh the TTL for a user's socket mapping
* @param email
* @returns {Promise<void>}
*/
const refreshUserSocketTTL = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
const exists = await pubClient.exists(socketMappingKey);
if (exists) {
@@ -198,9 +116,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Remove a socket mapping for a user
* @param email
* @param socketId
* @returns {Promise<void>}
*/
const removeUserSocketMapping = async (email, socketId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis");
// Look up the bodyshopId associated with this socket
@@ -227,9 +151,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Get all socket mappings for a user
* @param email
* @returns {Promise<{}>}
*/
const getUserSocketMapping = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
@@ -249,9 +178,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Get socket IDs for a user by bodyshopId
* @param email
* @param bodyshopId
* @returns {Promise<{socketIds: [string, string], ttl: *}>}
*/
const getUserSocketMappingByBodyshop = async (email, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
@@ -270,7 +205,11 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Get bodyshop data from Redis or fetch from DB if missing
/**
* Get bodyshop data from Redis
* @param bodyshopId
* @returns {Promise<*>}
*/
const getBodyshopFromRedis = async (bodyshopId) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
@@ -303,7 +242,12 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Update or invalidate bodyshop data in Redis
/**
* Update or invalidate bodyshop data in Redis
* @param bodyshopId
* @param values
* @returns {Promise<void>}
*/
const updateOrInvalidateBodyshopFromRedis = async (bodyshopId, values = null) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
@@ -335,19 +279,118 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// NOTE: The following code was written for an abandoned branch and things have changes since the,
// Leaving it here for demonstration purposes, commenting it out so it does not get used
// Store multiple session data in Redis
// const setMultipleSessionData = async (socketId, keyValues) => {
// try {
// // keyValues is expected to be an object { key1: value1, key2: value2, ... }
// const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
// await pubClient.hset(`socket:${socketId}`, ...entries.flat());
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Retrieve multiple session data from Redis
// const getMultipleSessionData = async (socketId, keys) => {
// try {
// const data = await pubClient.hmget(`socket:${socketId}`, keys);
// // Redis returns an object with null values for missing keys, so we parse the non-null ones
// return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
// } catch (error) {
// logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
// try {
// // Use Redis multi/pipeline to batch the commands
// const multi = pubClient.multi();
// keyValueArray.forEach(([key, value]) => {
// multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
// });
// await multi.exec(); // Execute all queued commands
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to add an item to the end of the Redis list
// const addItemToEndOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// let userEmail = "unknown";
// let socketMappings = {};
// try {
// const userData = await getSessionData(socketId, "user");
// if (userData && userData.email) {
// userEmail = userData.email;
// socketMappings = await getUserSocketMapping(userEmail);
// }
// } catch (sessionError) {
// logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
// }
// const mappingString = JSON.stringify(socketMappings, null, 2);
// const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
// logger.log(errorMessage, "ERROR", "redis");
// }
// };
// Helper function to add an item to the beginning of the Redis list
// const addItemToBeginningOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to clear a list in Redis
// const clearList = async (socketId, key) => {
// try {
// await pubClient.del(`socket:${socketId}:${key}`);
// } catch (error) {
// logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Add methods to manage room users
// const addUserToRoom = async (room, user) => {
// try {
// await pubClient.sadd(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
// Remove users from room
// const removeUserFromRoom = async (room, user) => {
// try {
// await pubClient.srem(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
// Get Users in room
// const getUsersInRoom = async (room) => {
// try {
// const users = await pubClient.smembers(room);
// return users.map((user) => JSON.parse(user));
// } catch (error) {
// logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
// }
// };
const api = {
getUserSocketMappingKey,
getBodyshopCacheKey,
setSessionData,
getSessionData,
clearSessionData,
setMultipleSessionData,
getMultipleSessionData,
setMultipleFromArraySessionData,
addItemToEndOfList,
addItemToBeginningOfList,
clearList,
addUserToRoom,
removeUserFromRoom,
getUsersInRoom,
addUserSocketMapping,
removeUserSocketMapping,
getUserSocketMappingByBodyshop,
@@ -355,6 +398,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
refreshUserSocketTTL,
getBodyshopFromRedis,
updateOrInvalidateBodyshopFromRedis
// setMultipleSessionData,
// getMultipleSessionData,
// setMultipleFromArraySessionData,
// addItemToEndOfList,
// addItemToBeginningOfList,
// clearList,
// addUserToRoom,
// removeUserFromRoom,
// getUsersInRoom,
};
Object.assign(module.exports, api);