From 360a1954f4a3ec2a0e197b8a831dcce392455cfc Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Wed, 12 Mar 2025 20:00:53 -0400 Subject: [PATCH] IO-3166-Global-Notifications-Part-2: Make sure BULLMQ prefixes do not collide --- server/notifications/queues/appQueue.js | 13 ++++++++----- server/notifications/queues/emailQueue.js | 13 ++++++++----- server/utils/getBullMQPrefix.js | 3 +++ 3 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 server/utils/getBullMQPrefix.js diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index 12f6f6fd8..47663c23e 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -1,6 +1,7 @@ const { Queue, Worker } = require("bullmq"); const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries"); const { registerCleanupTask } = require("../../utils/cleanupManager"); +const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 @@ -45,17 +46,19 @@ const buildNotificationContent = (notifications) => { */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!addQueue || !consolidateQueue) { - logger.logger.debug("Initializing Notifications Queues"); + const prefix = getBullMQPrefix(); + + logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`); addQueue = new Queue("notificationsAdd", { + prefix, connection: pubClient, - prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); consolidateQueue = new Queue("notificationsConsolidate", { + prefix, connection: pubClient, - prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); @@ -100,8 +103,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } }, { + prefix, connection: pubClient, - prefix: "{BULLMQ}", concurrency: 5 } ); @@ -227,8 +230,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } }, { + prefix, connection: pubClient, - prefix: "{BULLMQ}", concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index 9a07a0db1..c595082cd 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -3,6 +3,7 @@ const { sendTaskEmail } = require("../../email/sendemail"); const generateEmailTemplate = require("../../email/generateTemplate"); const { InstanceEndpoints } = require("../../utils/instanceMgr"); const { registerCleanupTask } = require("../../utils/cleanupManager"); +const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS; @@ -34,19 +35,21 @@ let emailConsolidateWorker; */ const loadEmailQueue = async ({ pubClient, logger }) => { if (!emailAddQueue || !emailConsolidateQueue) { - logger.logger.debug("Initializing Email Notification Queues"); + const prefix = getBullMQPrefix(); + + logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`); // Queue for adding email notifications emailAddQueue = new Queue("emailAdd", { + prefix, connection: pubClient, - prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Queue for consolidating and sending emails emailConsolidateQueue = new Queue("emailConsolidate", { + prefix, connection: pubClient, - prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); @@ -91,8 +94,8 @@ const loadEmailQueue = async ({ pubClient, logger }) => { } }, { + prefix, connection: pubClient, - prefix: "{BULLMQ}", concurrency: 5 } ); @@ -159,8 +162,8 @@ const loadEmailQueue = async ({ pubClient, logger }) => { } }, { + prefix, connection: pubClient, - prefix: "{BULLMQ}", concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } diff --git a/server/utils/getBullMQPrefix.js b/server/utils/getBullMQPrefix.js new file mode 100644 index 000000000..a7531523a --- /dev/null +++ b/server/utils/getBullMQPrefix.js @@ -0,0 +1,3 @@ +const getBullMQPrefix = () => (process.env?.NODE_ENV === "production" ? "{PROD-BULLMQ}" : "{DEV-BULLMQ}"); + +module.exports = getBullMQPrefix;