IO-3166-Global-Notifications-Part-2: add additional key prefixes for dev v prod

This commit is contained in:
Dave Richer
2025-03-12 21:10:42 -04:00
parent 360a1954f4
commit f21ba8e087
3 changed files with 192 additions and 136 deletions

View File

@@ -47,6 +47,7 @@ const buildNotificationContent = (notifications) => {
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!addQueue || !consolidateQueue) {
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`);
@@ -68,7 +69,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
logger.logger.debug(`Adding notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
for (const recipient of recipients) {
@@ -81,7 +82,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
const consolidateKey = `app:consolidate:${jobId}`;
const consolidateKey = `app:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
@@ -115,8 +116,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
const { jobId, recipients } = job.data;
logger.logger.debug(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const lockKey = `lock:consolidate:${jobId}`;
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
const lockKey = `lock:${devKey}:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
@@ -215,7 +217,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
}
}
await pubClient.del(`app:consolidate:${jobId}`);
await pubClient.del(`app:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,

View File

@@ -36,6 +36,7 @@ let emailConsolidateWorker;
const loadEmailQueue = async ({ pubClient, logger }) => {
if (!emailAddQueue || !emailConsolidateQueue) {
const prefix = getBullMQPrefix();
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`);
@@ -60,21 +61,22 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
logger.logger.debug(`Adding email notifications for jobId ${jobId}`);
const redisKeyPrefix = `email:notifications:${jobId}`;
const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`;
for (const recipient of recipients) {
const { user, firstName, lastName } = recipient;
const userKey = `${redisKeyPrefix}:${user}`;
await pubClient.rpush(userKey, body);
await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000);
const detailsKey = `email:recipientDetails:${jobId}:${user}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`;
await pubClient.hsetnx(detailsKey, "firstName", firstName || "");
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
await pubClient.sadd(`email:recipients:${jobId}`, user);
await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user);
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
}
const consolidateKey = `email:consolidate:${jobId}`;
const consolidateKey = `email:${devKey}:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
if (flagSet) {
await emailConsolidateQueue.add(
@@ -107,15 +109,15 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
const { jobId, jobRoNumber, bodyShopName } = job.data;
logger.logger.debug(`Consolidating emails for jobId ${jobId}`);
const lockKey = `lock:emailConsolidate:${jobId}`;
const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
if (lockAcquired) {
try {
const recipientsSet = `email:recipients:${jobId}`;
const recipientsSet = `email:${devKey}:recipients:${jobId}`;
const recipients = await pubClient.smembers(recipientsSet);
for (const recipient of recipients) {
const userKey = `email:notifications:${jobId}:${recipient}`;
const detailsKey = `email:recipientDetails:${jobId}:${recipient}`;
const userKey = `email:${devKey}:notifications:${jobId}:${recipient}`;
const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${recipient}`;
const messages = await pubClient.lrange(userKey, 0, -1);
if (messages.length > 0) {
const details = await pubClient.hgetall(detailsKey);
@@ -147,7 +149,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
}
}
await pubClient.del(recipientsSet);
await pubClient.del(`email:consolidate:${jobId}`);
await pubClient.del(`email:${devKey}:consolidate:${jobId}`);
} catch (err) {
logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", {
message: err?.message,

View File

@@ -10,6 +10,14 @@ const BODYSHOP_CACHE_TTL = 3600; // 1 hour
*/
const getBodyshopCacheKey = (bodyshopId) => `bodyshop-cache:${bodyshopId}`;
/**
* Generate a cache key for a user socket mapping
* @param email
* @returns {`user:${string}:${string}:socketMapping`}
*/
const getUserSocketMappingKey = (email) =>
`user:${process.env?.NODE_ENV === "production" ? "prod" : "dev"}:${email}:socketMapping`;
/**
* Fetch bodyshop data from the database
* @param bodyshopId
@@ -69,110 +77,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Store multiple session data in Redis
const setMultipleSessionData = async (socketId, keyValues) => {
try {
// keyValues is expected to be an object { key1: value1, key2: value2, ... }
const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
await pubClient.hset(`socket:${socketId}`, ...entries.flat());
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Retrieve multiple session data from Redis
const getMultipleSessionData = async (socketId, keys) => {
try {
const data = await pubClient.hmget(`socket:${socketId}`, keys);
// Redis returns an object with null values for missing keys, so we parse the non-null ones
return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
} catch (error) {
logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
try {
// Use Redis multi/pipeline to batch the commands
const multi = pubClient.multi();
keyValueArray.forEach(([key, value]) => {
multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
});
await multi.exec(); // Execute all queued commands
} catch (error) {
logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Helper function to add an item to the end of the Redis list
const addItemToEndOfList = async (socketId, key, newItem) => {
try {
await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
let userEmail = "unknown";
let socketMappings = {};
try {
const userData = await getSessionData(socketId, "user");
if (userData && userData.email) {
userEmail = userData.email;
socketMappings = await getUserSocketMapping(userEmail);
}
} catch (sessionError) {
logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
}
const mappingString = JSON.stringify(socketMappings, null, 2);
const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
logger.log(errorMessage, "ERROR", "redis");
}
};
// Helper function to add an item to the beginning of the Redis list
const addItemToBeginningOfList = async (socketId, key, newItem) => {
try {
await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
} catch (error) {
logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Helper function to clear a list in Redis
const clearList = async (socketId, key) => {
try {
await pubClient.del(`socket:${socketId}:${key}`);
} catch (error) {
logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
}
};
// Add methods to manage room users
const addUserToRoom = async (room, user) => {
try {
await pubClient.sadd(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
}
};
const removeUserFromRoom = async (room, user) => {
try {
await pubClient.srem(room, JSON.stringify(user));
} catch (error) {
logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
}
};
const getUsersInRoom = async (room) => {
try {
const users = await pubClient.smembers(room);
return users.map((user) => JSON.parse(user));
} catch (error) {
logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
}
};
/**
* Add a socket mapping for a user
* @param email
* @param socketId
* @param bodyshopId
* @returns {Promise<void>}
*/
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
// Save the mapping: socketId -> bodyshopId
@@ -184,9 +97,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Refresh the TTL for a user's socket mapping
* @param email
* @returns {Promise<void>}
*/
const refreshUserSocketTTL = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
const exists = await pubClient.exists(socketMappingKey);
if (exists) {
@@ -198,9 +116,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Remove a socket mapping for a user
* @param email
* @param socketId
* @returns {Promise<void>}
*/
const removeUserSocketMapping = async (email, socketId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis");
// Look up the bodyshopId associated with this socket
@@ -227,9 +151,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Get all socket mappings for a user
* @param email
* @returns {Promise<{}>}
*/
const getUserSocketMapping = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
@@ -249,9 +178,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
/**
* Get socket IDs for a user by bodyshopId
* @param email
* @param bodyshopId
* @returns {Promise<{socketIds: [string, string], ttl: *}>}
*/
const getUserSocketMappingByBodyshop = async (email, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
const socketMappingKey = getUserSocketMappingKey(email);
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
@@ -270,7 +205,11 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Get bodyshop data from Redis or fetch from DB if missing
/**
* Get bodyshop data from Redis
* @param bodyshopId
* @returns {Promise<*>}
*/
const getBodyshopFromRedis = async (bodyshopId) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
@@ -303,7 +242,12 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// Update or invalidate bodyshop data in Redis
/**
* Update or invalidate bodyshop data in Redis
* @param bodyshopId
* @param values
* @returns {Promise<void>}
*/
const updateOrInvalidateBodyshopFromRedis = async (bodyshopId, values = null) => {
const key = getBodyshopCacheKey(bodyshopId);
try {
@@ -335,19 +279,118 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
}
};
// NOTE: The following code was written for an abandoned branch and things have changes since the,
// Leaving it here for demonstration purposes, commenting it out so it does not get used
// Store multiple session data in Redis
// const setMultipleSessionData = async (socketId, keyValues) => {
// try {
// // keyValues is expected to be an object { key1: value1, key2: value2, ... }
// const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]);
// await pubClient.hset(`socket:${socketId}`, ...entries.flat());
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Retrieve multiple session data from Redis
// const getMultipleSessionData = async (socketId, keys) => {
// try {
// const data = await pubClient.hmget(`socket:${socketId}`, keys);
// // Redis returns an object with null values for missing keys, so we parse the non-null ones
// return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null]));
// } catch (error) {
// logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// const setMultipleFromArraySessionData = async (socketId, keyValueArray) => {
// try {
// // Use Redis multi/pipeline to batch the commands
// const multi = pubClient.multi();
// keyValueArray.forEach(([key, value]) => {
// multi.hset(`socket:${socketId}`, key, JSON.stringify(value));
// });
// await multi.exec(); // Execute all queued commands
// } catch (error) {
// logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to add an item to the end of the Redis list
// const addItemToEndOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// let userEmail = "unknown";
// let socketMappings = {};
// try {
// const userData = await getSessionData(socketId, "user");
// if (userData && userData.email) {
// userEmail = userData.email;
// socketMappings = await getUserSocketMapping(userEmail);
// }
// } catch (sessionError) {
// logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
// }
// const mappingString = JSON.stringify(socketMappings, null, 2);
// const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
// logger.log(errorMessage, "ERROR", "redis");
// }
// };
// Helper function to add an item to the beginning of the Redis list
// const addItemToBeginningOfList = async (socketId, key, newItem) => {
// try {
// await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
// } catch (error) {
// logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Helper function to clear a list in Redis
// const clearList = async (socketId, key) => {
// try {
// await pubClient.del(`socket:${socketId}:${key}`);
// } catch (error) {
// logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis");
// }
// };
// Add methods to manage room users
// const addUserToRoom = async (room, user) => {
// try {
// await pubClient.sadd(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
// Remove users from room
// const removeUserFromRoom = async (room, user) => {
// try {
// await pubClient.srem(room, JSON.stringify(user));
// } catch (error) {
// logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis");
// }
// };
// Get Users in room
// const getUsersInRoom = async (room) => {
// try {
// const users = await pubClient.smembers(room);
// return users.map((user) => JSON.parse(user));
// } catch (error) {
// logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis");
// }
// };
const api = {
getUserSocketMappingKey,
getBodyshopCacheKey,
setSessionData,
getSessionData,
clearSessionData,
setMultipleSessionData,
getMultipleSessionData,
setMultipleFromArraySessionData,
addItemToEndOfList,
addItemToBeginningOfList,
clearList,
addUserToRoom,
removeUserFromRoom,
getUsersInRoom,
addUserSocketMapping,
removeUserSocketMapping,
getUserSocketMappingByBodyshop,
@@ -355,6 +398,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
refreshUserSocketTTL,
getBodyshopFromRedis,
updateOrInvalidateBodyshopFromRedis
// setMultipleSessionData,
// getMultipleSessionData,
// setMultipleFromArraySessionData,
// addItemToEndOfList,
// addItemToBeginningOfList,
// clearList,
// addUserToRoom,
// removeUserFromRoom,
// getUsersInRoom,
};
Object.assign(module.exports, api);