diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index 47663c23e..fcc8b9e30 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -47,6 +47,7 @@ const buildNotificationContent = (notifications) => { const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!addQueue || !consolidateQueue) { const prefix = getBullMQPrefix(); + const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`); @@ -68,7 +69,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; logger.logger.debug(`Adding notifications for jobId ${jobId}`); - const redisKeyPrefix = `app:notifications:${jobId}`; + const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; for (const recipient of recipients) { @@ -81,7 +82,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } - const consolidateKey = `app:consolidate:${jobId}`; + const consolidateKey = `app:${devKey}:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); @@ -115,8 +116,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const { jobId, recipients } = job.data; logger.logger.debug(`Consolidating notifications for jobId ${jobId}`); - const redisKeyPrefix = `app:notifications:${jobId}`; - const lockKey = `lock:consolidate:${jobId}`; + const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; + const lockKey = `lock:${devKey}:consolidate:${jobId}`; + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); @@ -215,7 +217,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } } - await pubClient.del(`app:consolidate:${jobId}`); + await pubClient.del(`app:${devKey}:consolidate:${jobId}`); } catch (err) { logger.log(`app-queue-consolidation-error`, "ERROR", "notifications", "api", { message: err?.message, diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index c595082cd..cd0275b4e 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -36,6 +36,7 @@ let emailConsolidateWorker; const loadEmailQueue = async ({ pubClient, logger }) => { if (!emailAddQueue || !emailConsolidateQueue) { const prefix = getBullMQPrefix(); + const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`); @@ -60,21 +61,22 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; logger.logger.debug(`Adding email notifications for jobId ${jobId}`); - const redisKeyPrefix = `email:notifications:${jobId}`; + const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`; + for (const recipient of recipients) { const { user, firstName, lastName } = recipient; const userKey = `${redisKeyPrefix}:${user}`; await pubClient.rpush(userKey, body); await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); - const detailsKey = `email:recipientDetails:${jobId}:${user}`; + const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`; await pubClient.hsetnx(detailsKey, "firstName", firstName || ""); await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); - await pubClient.sadd(`email:recipients:${jobId}`, user); + await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user); logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`); } - const consolidateKey = `email:consolidate:${jobId}`; + const consolidateKey = `email:${devKey}:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); if (flagSet) { await emailConsolidateQueue.add( @@ -107,15 +109,15 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const { jobId, jobRoNumber, bodyShopName } = job.data; logger.logger.debug(`Consolidating emails for jobId ${jobId}`); - const lockKey = `lock:emailConsolidate:${jobId}`; + const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); if (lockAcquired) { try { - const recipientsSet = `email:recipients:${jobId}`; + const recipientsSet = `email:${devKey}:recipients:${jobId}`; const recipients = await pubClient.smembers(recipientsSet); for (const recipient of recipients) { - const userKey = `email:notifications:${jobId}:${recipient}`; - const detailsKey = `email:recipientDetails:${jobId}:${recipient}`; + const userKey = `email:${devKey}:notifications:${jobId}:${recipient}`; + const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${recipient}`; const messages = await pubClient.lrange(userKey, 0, -1); if (messages.length > 0) { const details = await pubClient.hgetall(detailsKey); @@ -147,7 +149,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { } } await pubClient.del(recipientsSet); - await pubClient.del(`email:consolidate:${jobId}`); + await pubClient.del(`email:${devKey}:consolidate:${jobId}`); } catch (err) { logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", { message: err?.message, diff --git a/server/utils/redisHelpers.js b/server/utils/redisHelpers.js index 763981962..73349c2cd 100644 --- a/server/utils/redisHelpers.js +++ b/server/utils/redisHelpers.js @@ -10,6 +10,14 @@ const BODYSHOP_CACHE_TTL = 3600; // 1 hour */ const getBodyshopCacheKey = (bodyshopId) => `bodyshop-cache:${bodyshopId}`; +/** + * Generate a cache key for a user socket mapping + * @param email + * @returns {`user:${string}:${string}:socketMapping`} + */ +const getUserSocketMappingKey = (email) => + `user:${process.env?.NODE_ENV === "production" ? "prod" : "dev"}:${email}:socketMapping`; + /** * Fetch bodyshop data from the database * @param bodyshopId @@ -69,110 +77,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; - // Store multiple session data in Redis - const setMultipleSessionData = async (socketId, keyValues) => { - try { - // keyValues is expected to be an object { key1: value1, key2: value2, ... } - const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]); - await pubClient.hset(`socket:${socketId}`, ...entries.flat()); - } catch (error) { - logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); - } - }; - - // Retrieve multiple session data from Redis - const getMultipleSessionData = async (socketId, keys) => { - try { - const data = await pubClient.hmget(`socket:${socketId}`, keys); - // Redis returns an object with null values for missing keys, so we parse the non-null ones - return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null])); - } catch (error) { - logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); - } - }; - - const setMultipleFromArraySessionData = async (socketId, keyValueArray) => { - try { - // Use Redis multi/pipeline to batch the commands - const multi = pubClient.multi(); - keyValueArray.forEach(([key, value]) => { - multi.hset(`socket:${socketId}`, key, JSON.stringify(value)); - }); - await multi.exec(); // Execute all queued commands - } catch (error) { - logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); - } - }; - - // Helper function to add an item to the end of the Redis list - const addItemToEndOfList = async (socketId, key, newItem) => { - try { - await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); - } catch (error) { - let userEmail = "unknown"; - let socketMappings = {}; - try { - const userData = await getSessionData(socketId, "user"); - if (userData && userData.email) { - userEmail = userData.email; - socketMappings = await getUserSocketMapping(userEmail); - } - } catch (sessionError) { - logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis"); - } - const mappingString = JSON.stringify(socketMappings, null, 2); - const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`; - logger.log(errorMessage, "ERROR", "redis"); - } - }; - - // Helper function to add an item to the beginning of the Redis list - const addItemToBeginningOfList = async (socketId, key, newItem) => { - try { - await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); - } catch (error) { - logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis"); - } - }; - - // Helper function to clear a list in Redis - const clearList = async (socketId, key) => { - try { - await pubClient.del(`socket:${socketId}:${key}`); - } catch (error) { - logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis"); - } - }; - - // Add methods to manage room users - const addUserToRoom = async (room, user) => { - try { - await pubClient.sadd(room, JSON.stringify(user)); - } catch (error) { - logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis"); - } - }; - - const removeUserFromRoom = async (room, user) => { - try { - await pubClient.srem(room, JSON.stringify(user)); - } catch (error) { - logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis"); - } - }; - - const getUsersInRoom = async (room) => { - try { - const users = await pubClient.smembers(room); - return users.map((user) => JSON.parse(user)); - } catch (error) { - logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis"); - } - }; - + /** + * Add a socket mapping for a user + * @param email + * @param socketId + * @param bodyshopId + * @returns {Promise} + */ const addUserSocketMapping = async (email, socketId, bodyshopId) => { - const userKey = `user:${email}`; - const socketMappingKey = `${userKey}:socketMapping`; + const socketMappingKey = getUserSocketMappingKey(email); try { logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis"); // Save the mapping: socketId -> bodyshopId @@ -184,9 +97,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; + /** + * Refresh the TTL for a user's socket mapping + * @param email + * @returns {Promise} + */ const refreshUserSocketTTL = async (email) => { - const userKey = `user:${email}`; - const socketMappingKey = `${userKey}:socketMapping`; + const socketMappingKey = getUserSocketMappingKey(email); + try { const exists = await pubClient.exists(socketMappingKey); if (exists) { @@ -198,9 +116,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; + /** + * Remove a socket mapping for a user + * @param email + * @param socketId + * @returns {Promise} + */ const removeUserSocketMapping = async (email, socketId) => { - const userKey = `user:${email}`; - const socketMappingKey = `${userKey}:socketMapping`; + const socketMappingKey = getUserSocketMappingKey(email); + try { logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis"); // Look up the bodyshopId associated with this socket @@ -227,9 +151,14 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; + /** + * Get all socket mappings for a user + * @param email + * @returns {Promise<{}>} + */ const getUserSocketMapping = async (email) => { - const userKey = `user:${email}`; - const socketMappingKey = `${userKey}:socketMapping`; + const socketMappingKey = getUserSocketMappingKey(email); + try { // Retrieve all socket mappings for the user const mapping = await pubClient.hgetall(socketMappingKey); @@ -249,9 +178,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; + /** + * Get socket IDs for a user by bodyshopId + * @param email + * @param bodyshopId + * @returns {Promise<{socketIds: [string, string], ttl: *}>} + */ const getUserSocketMappingByBodyshop = async (email, bodyshopId) => { - const userKey = `user:${email}`; - const socketMappingKey = `${userKey}:socketMapping`; + const socketMappingKey = getUserSocketMappingKey(email); + try { // Retrieve all socket mappings for the user const mapping = await pubClient.hgetall(socketMappingKey); @@ -270,7 +205,11 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; - // Get bodyshop data from Redis or fetch from DB if missing + /** + * Get bodyshop data from Redis + * @param bodyshopId + * @returns {Promise<*>} + */ const getBodyshopFromRedis = async (bodyshopId) => { const key = getBodyshopCacheKey(bodyshopId); try { @@ -303,7 +242,12 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; - // Update or invalidate bodyshop data in Redis + /** + * Update or invalidate bodyshop data in Redis + * @param bodyshopId + * @param values + * @returns {Promise} + */ const updateOrInvalidateBodyshopFromRedis = async (bodyshopId, values = null) => { const key = getBodyshopCacheKey(bodyshopId); try { @@ -335,19 +279,118 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; + // NOTE: The following code was written for an abandoned branch and things have changes since the, + // Leaving it here for demonstration purposes, commenting it out so it does not get used + + // Store multiple session data in Redis + // const setMultipleSessionData = async (socketId, keyValues) => { + // try { + // // keyValues is expected to be an object { key1: value1, key2: value2, ... } + // const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]); + // await pubClient.hset(`socket:${socketId}`, ...entries.flat()); + // } catch (error) { + // logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); + // } + // }; + + // Retrieve multiple session data from Redis + // const getMultipleSessionData = async (socketId, keys) => { + // try { + // const data = await pubClient.hmget(`socket:${socketId}`, keys); + // // Redis returns an object with null values for missing keys, so we parse the non-null ones + // return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null])); + // } catch (error) { + // logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); + // } + // }; + + // const setMultipleFromArraySessionData = async (socketId, keyValueArray) => { + // try { + // // Use Redis multi/pipeline to batch the commands + // const multi = pubClient.multi(); + // keyValueArray.forEach(([key, value]) => { + // multi.hset(`socket:${socketId}`, key, JSON.stringify(value)); + // }); + // await multi.exec(); // Execute all queued commands + // } catch (error) { + // logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); + // } + // }; + + // Helper function to add an item to the end of the Redis list + // const addItemToEndOfList = async (socketId, key, newItem) => { + // try { + // await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); + // } catch (error) { + // let userEmail = "unknown"; + // let socketMappings = {}; + // try { + // const userData = await getSessionData(socketId, "user"); + // if (userData && userData.email) { + // userEmail = userData.email; + // socketMappings = await getUserSocketMapping(userEmail); + // } + // } catch (sessionError) { + // logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis"); + // } + // const mappingString = JSON.stringify(socketMappings, null, 2); + // const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`; + // logger.log(errorMessage, "ERROR", "redis"); + // } + // }; + + // Helper function to add an item to the beginning of the Redis list + // const addItemToBeginningOfList = async (socketId, key, newItem) => { + // try { + // await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); + // } catch (error) { + // logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis"); + // } + // }; + + // Helper function to clear a list in Redis + // const clearList = async (socketId, key) => { + // try { + // await pubClient.del(`socket:${socketId}:${key}`); + // } catch (error) { + // logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis"); + // } + // }; + + // Add methods to manage room users + // const addUserToRoom = async (room, user) => { + // try { + // await pubClient.sadd(room, JSON.stringify(user)); + // } catch (error) { + // logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis"); + // } + // }; + + // Remove users from room + // const removeUserFromRoom = async (room, user) => { + // try { + // await pubClient.srem(room, JSON.stringify(user)); + // } catch (error) { + // logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis"); + // } + // }; + + // Get Users in room + // const getUsersInRoom = async (room) => { + // try { + // const users = await pubClient.smembers(room); + // return users.map((user) => JSON.parse(user)); + // } catch (error) { + // logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis"); + // } + // }; + const api = { + getUserSocketMappingKey, + getBodyshopCacheKey, setSessionData, getSessionData, clearSessionData, - setMultipleSessionData, - getMultipleSessionData, - setMultipleFromArraySessionData, - addItemToEndOfList, - addItemToBeginningOfList, - clearList, - addUserToRoom, - removeUserFromRoom, - getUsersInRoom, addUserSocketMapping, removeUserSocketMapping, getUserSocketMappingByBodyshop, @@ -355,6 +398,15 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { refreshUserSocketTTL, getBodyshopFromRedis, updateOrInvalidateBodyshopFromRedis + // setMultipleSessionData, + // getMultipleSessionData, + // setMultipleFromArraySessionData, + // addItemToEndOfList, + // addItemToBeginningOfList, + // clearList, + // addUserToRoom, + // removeUserFromRoom, + // getUsersInRoom, }; Object.assign(module.exports, api);