From c1ea8e8a3ddbdc48862e4fffabbcc953996f8930 Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Tue, 18 Feb 2025 13:38:57 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications - Checkpoint, App Queue --- server.js | 6 +- server/notifications/queues/appQueue.js | 148 ++++++++++++++++++++++- server/notifications/scenarioBuilders.js | 2 +- server/notifications/scenarioParser.js | 31 +++-- 4 files changed, 171 insertions(+), 16 deletions(-) diff --git a/server.js b/server.js index 81f6771c2..ef7ccdafc 100644 --- a/server.js +++ b/server.js @@ -292,8 +292,8 @@ const applySocketIO = async ({ server, app }) => { /** * Load Queues for Email and App */ -const loadQueues = async ({ pubClient, logger, redisHelpers }) => { - const queueSettings = { pubClient, logger, redisHelpers }; +const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { + const queueSettings = { pubClient, logger, redisHelpers, ioRedis }; // Assuming loadEmailQueue and loadAppQueue return Promises const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([ @@ -329,7 +329,7 @@ const main = async () => { require("./server/web-sockets/web-socket"); // Initialize Queues - await loadQueues({ pubClient: pubClient, logger, redisHelpers }); + await loadQueues({ pubClient: pubClient, logger, redisHelpers, ioRedis }); applyMiddleware({ app }); applyRoutes({ app }); diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index 0f9ca3584..27e83a6b6 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -1,15 +1,143 @@ -const { Queue } = require("bullmq"); +const { Queue, Worker } = require("bullmq"); let appQueue; -const loadAppQueue = async ({ pubClient, logger, redisHelpers }) => { +const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!appQueue) { logger.logger.info("Initializing Notifications App Queue"); appQueue = new Queue("notificationsApp", { connection: pubClient, prefix: "{BULLMQ}" }); + + const worker = new Worker( + "notificationsApp", + async (job) => { + const { jobId, bodyShopId, key, variables, recipients } = job.data; + logger.logger.info(`Processing app job ${job.id} for jobId ${jobId}`); + + const redisKey = `app:notifications:${jobId}`; + const lastSentKey = `${redisKey}:lastSent`; + + if (job.name === "add-notification") { + const notification = { key, variables, timestamp: Date.now() }; + for (const recipient of recipients) { + const { user } = recipient; + const userKey = `${redisKey}:${user}`; + const existingNotifications = await pubClient.get(userKey); + const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; + notifications.push(notification); + await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40); + } + } else if (job.name === "send-notifications") { + let hasNewNotifications = false; + const lastSent = parseInt((await pubClient.get(lastSentKey)) || "0", 10); + + for (const recipient of recipients) { + const { user, bodyShopId: recipientBodyShopId } = recipient; + const userKey = `${redisKey}:${user}`; + const notifications = await pubClient.get(userKey); + if (notifications) { + const parsedNotifications = JSON.parse(notifications); + const newNotifications = parsedNotifications.filter((n) => n.timestamp > lastSent); + if (newNotifications.length > 0) { + hasNewNotifications = true; + const socketIds = await redisHelpers.getUserSocketMapping(user); + if (socketIds && socketIds[bodyShopId]?.socketIds) { + socketIds[bodyShopId].socketIds.forEach((socketId) => { + ioRedis.to(socketId).emit("notification", { + jobId, + bodyShopId: recipientBodyShopId, + notifications: newNotifications + }); + }); + logger.logger.info(`Sent ${newNotifications.length} new notifications to ${user} for jobId ${jobId}`); + } else { + logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); + } + await pubClient.del(userKey); + } + } + } + + if (hasNewNotifications) { + await pubClient.set(lastSentKey, Date.now(), "EX", 300); + } else { + // Only remove if no active "add-notification" jobs are pending + const activeJobs = await appQueue.getActive(); + const hasPendingAdds = activeJobs.some((j) => j.name === "add-notification" && j.data.jobId === jobId); + if (!hasPendingAdds) { + const recurringJobKey = `send-notifications:${jobId}`; + const removed = await appQueue.removeRepeatable("send-notifications", { + every: 30 * 1000, + jobId: recurringJobKey + }); + if (removed) { + logger.logger.info( + `Successfully removed recurring send-notifications job for jobId ${jobId} due to no new notifications` + ); + } else { + logger.logger.warn( + `Failed to remove recurring send-notifications job for jobId ${jobId} - may already be removed` + ); + } + } else { + logger.logger.info( + `Skipping removal of send-notifications for jobId ${jobId} - pending add-notification jobs exist` + ); + } + } + } + }, + { + connection: pubClient, + prefix: "{BULLMQ}", + concurrency: 5 + } + ); + + worker.on("completed", async (job) => { + if (job.name === "add-notification") { + const { jobId } = job.data; + const recurringJobKey = `send-notifications:${jobId}`; + const existingJobs = await appQueue.getRepeatableJobs(); + if (!existingJobs.some((j) => j.key === recurringJobKey)) { + await appQueue.add( + "send-notifications", + { jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients }, + { + repeat: { + every: 30 * 1000, // Every 30 seconds + limit: 10 // 5 minutes + }, + jobId: recurringJobKey + } + ); + logger.logger.info(`Scheduled 30s notification send for jobId ${jobId}`); + } + } + logger.logger.info(`Job ${job.id} completed`); + }); + + worker.on("failed", (job, err) => { + logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err }); + }); + + worker.on("error", (err) => { + logger.logger.error("Worker error:", { error: err }); + }); + + const shutdown = async () => { + if (worker) { + logger.logger.info("Closing app queue worker..."); + await worker.close(); + logger.logger.info("App queue worker closed"); + } + }; + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); } + return appQueue; }; @@ -20,4 +148,18 @@ const getQueue = () => { return appQueue; }; -module.exports = { loadAppQueue, getQueue }; +const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { + const appQueue = getQueue(); + + for (const app of appsToDispatch) { + const { jobId, bodyShopId, key, variables, recipients } = app; + await appQueue.add( + "add-notification", + { jobId, bodyShopId, key, variables, recipients }, + { jobId: `${jobId}:${Date.now()}` } + ); + logger.logger.info(`Added app notification to queue for jobId ${jobId} with ${recipients.length} recipients`); + } +}; + +module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue }; diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js index d4648f261..90aaf40ff 100644 --- a/server/notifications/scenarioBuilders.js +++ b/server/notifications/scenarioBuilders.js @@ -343,7 +343,7 @@ const tasksUpdatedCreatedBuilder = (data) => { app: { jobId: data.jobId, bodyShopId: data.bodyShopId, - key: "notifications.job.tasksUpdated", + key: data.isNew ? "notifications.job.taskCreated" : "notifications.job.taskUpdated", variables: { type: data.isNew ? "created" : "updated", roNumber: data.jobRoNumber diff --git a/server/notifications/scenarioParser.js b/server/notifications/scenarioParser.js index cb9f0fc2e..ae111fe9c 100644 --- a/server/notifications/scenarioParser.js +++ b/server/notifications/scenarioParser.js @@ -11,6 +11,7 @@ const { isEmpty, isFunction } = require("lodash"); const { getMatchingScenarios } = require("./scenarioMapperr"); const consoleDir = require("../utils/consoleDir"); const { dispatchEmailsToQueue } = require("./queues/emailQueue"); +const { dispatchAppsToQueue } = require("./queues/appQueue"); /** * Parses an event and determines matching scenarios for notifications. @@ -171,18 +172,30 @@ const scenarioParser = async (req, jobIdField) => { } // Step 9: Dispatch Email Notifications to the Email Notification Queue - dispatchEmailsToQueue({ - emailsToDispatch: scenariosToDispatch.map((scenario) => scenario?.email), - logger - }).catch((e) => - logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, { - message: e?.message - }) - ); + const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email); + if (!isEmpty(emailsToDispatch)) { + dispatchEmailsToQueue({ + emailsToDispatch, + logger + }).catch((e) => + logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, { + message: e?.message + }) + ); + } // Step 10: Dispatch App Notifications to the App Notification Queue const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app); - consoleDir({ appsToDispatch }); + if (!isEmpty(appsToDispatch)) { + dispatchAppsToQueue({ + appsToDispatch, + logger + }).catch((e) => + logger.log("Something went wrong dispatching apps to the App Notification Queue", "error", "queue", null, { + message: e?.message + }) + ); + } }; module.exports = scenarioParser;