From 2a8151710439be1bc568d6c6d7df4a4043a8ad25 Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Tue, 18 Feb 2025 17:37:24 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications - Checkpoint, App Queue --- server.js | 10 +- server/notifications/queues/appQueue.js | 263 ++++++++++++------------ server/utils/redisHelpers.js | 86 ++++---- server/web-sockets/redisSocketEvents.js | 20 +- 4 files changed, 190 insertions(+), 189 deletions(-) diff --git a/server.js b/server.js index ef7ccdafc..a4f71176a 100644 --- a/server.js +++ b/server.js @@ -195,7 +195,15 @@ const connectToRedisCluster = async () => { return new Promise((resolve, reject) => { redisCluster.on("ready", () => { logger.log(`Redis cluster connection established.`, "INFO", "redis", "api"); - resolve(redisCluster); + if (process.env.NODE_ENV === "development" && process.env?.CLEAR_REDIS_ON_START === "true") { + logger.log("[Development] Flushing Redis Cluster on Service start...", "INFO", "redis", "api"); + const master = redisCluster.nodes("master"); + Promise.all(master.map((node) => node.flushall())).then(() => { + resolve(redisCluster); + }); + } else { + resolve(redisCluster); + } }); redisCluster.on("error", (err) => { diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index bbbf270fd..b165e4e71 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -1,103 +1,57 @@ const { Queue, Worker } = require("bullmq"); -let appQueue; +let addQueue; +let consolidateQueue; const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { - if (!appQueue) { - logger.logger.info("Initializing Notifications App Queue"); - appQueue = new Queue("notificationsApp", { + if (!addQueue || !consolidateQueue) { + logger.logger.info("Initializing Notifications Queues"); + + addQueue = new Queue("notificationsAdd", { connection: pubClient, - prefix: "{BULLMQ}" + prefix: "{BULLMQ}", + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); - const worker = new Worker( - "notificationsApp", + consolidateQueue = new Queue("notificationsConsolidate", { + connection: pubClient, + prefix: "{BULLMQ}", + defaultJobOptions: { removeOnComplete: true, removeOnFail: true } + }); + + const addWorker = new Worker( + "notificationsAdd", async (job) => { - const { jobId, bodyShopId, key, variables, recipients } = job.data; - logger.logger.info(`Processing app job ${job.id} for jobId ${jobId}`); + const { jobId, key, variables, recipients } = job.data; + logger.logger.info(`Adding notifications for jobId ${jobId}`); - const redisKey = `app:notifications:${jobId}`; - const lastSentKey = `${redisKey}:lastSent`; - const lockKey = `lock:send-notifications:${jobId}`; - const recurringFlagKey = `app:recurring:${jobId}`; + const redisKeyPrefix = `app:notifications:${jobId}`; + const notification = { key, variables, timestamp: Date.now() }; - if (job.name === "add-notification") { - const notification = { key, variables, timestamp: Date.now() }; - for (const recipient of recipients) { - const { user } = recipient; - const userKey = `${redisKey}:${user}`; - const existingNotifications = await pubClient.get(userKey); - const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; - notifications.push(notification); - await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40); - } - } else if (job.name === "send-notifications") { - let hasNewNotifications = false; - const lastSent = parseInt((await pubClient.get(lastSentKey)) || "0", 10); + for (const recipient of recipients) { + const { user } = recipient; + const userKey = `${redisKeyPrefix}:${user}`; + const existingNotifications = await pubClient.get(userKey); + const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; + notifications.push(notification); + await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40); + logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); + } - for (const recipient of recipients) { - const { user, bodyShopId: recipientBodyShopId } = recipient; - const userKey = `${redisKey}:${user}`; - const notifications = await pubClient.get(userKey); - if (notifications) { - const parsedNotifications = JSON.parse(notifications); - const newNotifications = parsedNotifications.filter((n) => n.timestamp > lastSent); - if (newNotifications.length > 0) { - hasNewNotifications = true; - const socketIds = await redisHelpers.getUserSocketMapping(user); - if (socketIds && socketIds[bodyShopId]?.socketIds) { - socketIds[bodyShopId].socketIds.forEach((socketId) => { - ioRedis.to(socketId).emit("notification", { - jobId, - bodyShopId: recipientBodyShopId, - notifications: newNotifications - }); - }); - logger.logger.info(`Sent ${newNotifications.length} new notifications to ${user} for jobId ${jobId}`); - } else { - logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); - } - await pubClient.del(userKey); - } - } - } + const consolidateKey = `app:consolidate:${jobId}`; + const flagSet = await pubClient.setnx(consolidateKey, "pending"); + logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); - if (hasNewNotifications) { - await pubClient.set(lastSentKey, Date.now(), "EX", 300); - } else { - const activeJobs = await appQueue.getActive(); - const hasPendingAdds = activeJobs.some((j) => j.name === "add-notification" && j.data.jobId === jobId); - if (!hasPendingAdds) { - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); - if (lockAcquired) { - const recurringJobKey = `send-notifications:${jobId}`; - const repeatableJobs = await appQueue.getRepeatableJobs(); - const jobExists = repeatableJobs.some((j) => j.key === recurringJobKey); - if (jobExists) { - await appQueue.removeRepeatableByKey(recurringJobKey); - // Drain all remaining send-notifications jobs for this jobId - await appQueue.drain(false); // false to not force removal of active jobs - logger.logger.info( - `Successfully removed recurring send-notifications job and drained queue for jobId ${jobId} with key ${recurringJobKey}` - ); - } else { - logger.logger.info( - `No recurring send-notifications job found for jobId ${jobId} with key ${recurringJobKey} - processing leftover scheduled instance` - ); - } - await pubClient.del(lockKey); - await pubClient.del(recurringFlagKey); - } else { - logger.logger.info( - `Skipped removal of send-notifications for jobId ${jobId} - lock held by another worker` - ); - } - } else { - logger.logger.info( - `Skipping removal of send-notifications for jobId ${jobId} - pending add-notification jobs exist` - ); - } - } + if (flagSet) { + await consolidateQueue.add( + "consolidate-notifications", + { jobId, recipients }, + { jobId: `consolidate:${jobId}`, delay: 5000 } + ); + logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); + await pubClient.expire(consolidateKey, 300); + } else { + logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); } }, { @@ -107,61 +61,112 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } ); - worker.on("completed", async (job) => { - if (job.name === "add-notification") { - const { jobId } = job.data; - const recurringJobKey = `send-notifications:${jobId}`; - const recurringFlagKey = `app:recurring:${jobId}`; - const flagSet = await pubClient.setnx(recurringFlagKey, "active"); - if (flagSet) { - const existingJobs = await appQueue.getRepeatableJobs(); - if (!existingJobs.some((j) => j.key === recurringJobKey)) { - await appQueue.add( - "send-notifications", - { jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients }, - { - repeat: { - every: 30 * 1000, - limit: 10 - }, - jobId: recurringJobKey + const consolidateWorker = new Worker( + "notificationsConsolidate", + async (job) => { + const { jobId, recipients } = job.data; + logger.logger.info(`Consolidating notifications for jobId ${jobId}`); + + const redisKeyPrefix = `app:notifications:${jobId}`; + const lockKey = `lock:consolidate:${jobId}`; + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); + logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); + + if (lockAcquired) { + try { + const allNotifications = {}; + const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; + logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); + + for (const user of uniqueUsers) { + const userKey = `${redisKeyPrefix}:${user}`; + const notifications = await pubClient.get(userKey); + logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`); + + if (notifications) { + const parsedNotifications = JSON.parse(notifications); + const userRecipients = recipients.filter((r) => r.user === user); + for (const { bodyShopId } of userRecipients) { + allNotifications[user] = allNotifications[user] || {}; + allNotifications[user][bodyShopId] = parsedNotifications; + } + await pubClient.del(userKey); + logger.logger.debug(`Deleted Redis key ${userKey}`); + } else { + logger.logger.warn(`No notifications found for ${user} under ${userKey}`); } - ); - logger.logger.info(`Scheduled 30s notification send for jobId ${jobId} with key ${recurringJobKey}`); - await pubClient.expire(recurringFlagKey, 300); + } + + logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); + + for (const [user, bodyShopData] of Object.entries(allNotifications)) { + const userMapping = await redisHelpers.getUserSocketMapping(user); + logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`); + + for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { + if (userMapping && userMapping[bodyShopId]?.socketIds) { + userMapping[bodyShopId].socketIds.forEach((socketId) => { + logger.logger.debug( + `Emitting to socket ${socketId}: ${JSON.stringify({ jobId, bodyShopId, notifications })}` + ); + ioRedis.to(socketId).emit("notification", { + jobId, + bodyShopId, + notifications + }); + }); + logger.logger.info( + `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId}` + ); + } else { + logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); + } + } + } + + await pubClient.del(`app:consolidate:${jobId}`); + } catch (err) { + logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); + throw err; // Re-throw to trigger failed event + } finally { + await pubClient.del(lockKey); } + } else { + logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } + }, + { + connection: pubClient, + prefix: "{BULLMQ}", + concurrency: 1, + limiter: { max: 1, duration: 5000 } } - logger.logger.info(`Job ${job.id} completed`); - }); + ); - worker.on("failed", (job, err) => { - logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err }); - }); - - worker.on("error", (err) => { - logger.logger.error("Worker error:", { error: err }); - }); + addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`)); + consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`)); + addWorker.on("failed", (job, err) => + logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err }) + ); + consolidateWorker.on("failed", (job, err) => + logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) + ); const shutdown = async () => { - if (worker) { - logger.logger.info("Closing app queue worker..."); - await worker.close(); - logger.logger.info("App queue worker closed"); - } + logger.logger.info("Closing app queue workers..."); + await Promise.all([addWorker.close(), consolidateWorker.close()]); + logger.logger.info("App queue workers closed"); }; process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); } - return appQueue; + return addQueue; // Return the add queue for dispatching }; const getQueue = () => { - if (!appQueue) { - throw new Error("App queue not initialized. Ensure loadAppQueue is called during bootstrap."); - } - return appQueue; + if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap."); + return addQueue; }; const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { @@ -174,7 +179,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { { jobId, bodyShopId, key, variables, recipients }, { jobId: `${jobId}:${Date.now()}` } ); - logger.logger.info(`Added app notification to queue for jobId ${jobId} with ${recipients.length} recipients`); + logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/utils/redisHelpers.js b/server/utils/redisHelpers.js index 586d89b9e..affd7cdd8 100644 --- a/server/utils/redisHelpers.js +++ b/server/utils/redisHelpers.js @@ -136,75 +136,75 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { const addUserSocketMapping = async (email, socketId, bodyshopId) => { const userKey = `user:${email}`; - const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`; + const socketMappingKey = `${userKey}:socketMapping`; try { logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis"); - // Mark the bodyshop as associated with the user in the hash - await pubClient.hset(userKey, `bodyshops:${bodyshopId}`, "1"); - // Add the socket ID to the bodyshop-specific set - await pubClient.sadd(bodyshopKey, socketId); - // Set TTL to 24 hours for both keys - await pubClient.expire(userKey, 86400); - await pubClient.expire(bodyshopKey, 86400); + // Save the mapping: socketId -> bodyshopId + await pubClient.hset(socketMappingKey, socketId, bodyshopId); + // Set TTL (24 hours) for the mapping hash + await pubClient.expire(socketMappingKey, 86400); } catch (error) { logger.log(`Error adding socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis"); } }; - const refreshUserSocketTTL = async (email, bodyshopId) => { + const refreshUserSocketTTL = async (email) => { const userKey = `user:${email}`; - const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`; + const socketMappingKey = `${userKey}:socketMapping`; try { - const userExists = await pubClient.exists(userKey); - if (userExists) { - await pubClient.expire(userKey, 86400); - } - const bodyshopExists = await pubClient.exists(bodyshopKey); - if (bodyshopExists) { - await pubClient.expire(bodyshopKey, 86400); - logger.log(`Refreshed TTL for ${email} bodyshop ${bodyshopId} socket mapping`, "debug", "redis"); + const exists = await pubClient.exists(socketMappingKey); + if (exists) { + await pubClient.expire(socketMappingKey, 86400); + logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis"); } } catch (error) { - logger.log(`Error refreshing TTL for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis"); + logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis"); } }; - const removeUserSocketMapping = async (email, socketId, bodyshopId) => { + const removeUserSocketMapping = async (email, socketId) => { const userKey = `user:${email}`; - const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`; + const socketMappingKey = `${userKey}:socketMapping`; try { - logger.log(`Removing socket ${socketId} from user ${email} for bodyshop ${bodyshopId}`, "DEBUG", "redis"); - await pubClient.srem(bodyshopKey, socketId); - // Refresh TTL if there are still sockets, or let it expire - const remainingSockets = await pubClient.scard(bodyshopKey); - if (remainingSockets > 0) { - await pubClient.expire(bodyshopKey, 86400); - } else { - // Optionally remove the bodyshop field from the hash if no sockets remain - await pubClient.hdel(userKey, `bodyshops:${bodyshopId}`); + logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis"); + // Look up the bodyshopId associated with this socket + const bodyshopId = await pubClient.hget(socketMappingKey, socketId); + if (!bodyshopId) { + logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis"); + return; } - // Refresh user key TTL if there are still bodyshops - const remainingBodyshops = await pubClient.hlen(userKey); - if (remainingBodyshops > 0) { - await pubClient.expire(userKey, 86400); + // Remove the socket mapping + await pubClient.hdel(socketMappingKey, socketId); + logger.log( + `Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`, + "DEBUG", + "redis" + ); + + // Refresh TTL if any socket mappings remain + const remainingSockets = await pubClient.hlen(socketMappingKey); + if (remainingSockets > 0) { + await pubClient.expire(socketMappingKey, 86400); } } catch (error) { - logger.log(`Error removing socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis"); + logger.log(`Error removing socket mapping for ${email}: ${error}`, "ERROR", "redis"); } }; const getUserSocketMapping = async (email) => { const userKey = `user:${email}`; + const socketMappingKey = `${userKey}:socketMapping`; try { - // Get all bodyshop fields from the hash - const bodyshops = await pubClient.hkeys(userKey); + // Retrieve all socket mappings for the user + const mapping = await pubClient.hgetall(socketMappingKey); + const ttl = await pubClient.ttl(socketMappingKey); + // Group socket IDs by bodyshopId const result = {}; - for (const bodyshopField of bodyshops) { - const bodyshopId = bodyshopField.split("bodyshops:")[1]; - const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`; - const socketIds = await pubClient.smembers(bodyshopKey); - const ttl = await pubClient.ttl(bodyshopKey); - result[bodyshopId] = { socketIds, ttl }; + for (const [socketId, bodyshopId] of Object.entries(mapping)) { + if (!result[bodyshopId]) { + result[bodyshopId] = { socketIds: [], ttl }; + } + result[bodyshopId].socketIds.push(socketId); } return result; } catch (error) { diff --git a/server/web-sockets/redisSocketEvents.js b/server/web-sockets/redisSocketEvents.js index e20a43a6e..59baf0d0a 100644 --- a/server/web-sockets/redisSocketEvents.js +++ b/server/web-sockets/redisSocketEvents.js @@ -2,14 +2,7 @@ const { admin } = require("../firebase/firebase-handler"); const redisSocketEvents = ({ io, - redisHelpers: { - addUserSocketMapping, - removeUserSocketMapping, - setSessionData, - getSessionData, - clearSessionData, - refreshUserSocketTTL - }, + redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL }, ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom }, logger }) => { @@ -30,7 +23,6 @@ const redisSocketEvents = ({ try { const user = await admin.auth().verifyIdToken(token); socket.user = user; - await setSessionData(socket.id, "user", { ...user, bodyshopId }); await addUserSocketMapping(user.email, socket.id, bodyshopId); next(); } catch (error) { @@ -60,7 +52,6 @@ const redisSocketEvents = ({ return; } socket.user = user; - await setSessionData(socket.id, "user", { ...user, bodyshopId }); await refreshUserSocketTTL(user.email, bodyshopId); createLogEvent( socket, @@ -124,18 +115,15 @@ const redisSocketEvents = ({ const registerDisconnectEvents = (socket) => { const disconnect = async () => { if (socket.user?.email) { - const userData = await getSessionData(socket.id, "user"); - const bodyshopId = userData?.bodyshopId; - if (bodyshopId) { - await removeUserSocketMapping(socket.user.email, socket.id, bodyshopId); - } - await clearSessionData(socket.id); + await removeUserSocketMapping(socket.user.email, socket.id); } + // Leave all rooms except the default room (socket.id) const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id); for (const room of rooms) { socket.leave(room); } }; + socket.on("disconnect", disconnect); };