Merged in feature/IO-3166-Global-Notifications-Part-2 (pull request #2193)
IO-3166-Global-Notifications-Part-2: Make sure BULLMQ prefixes do not collide
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
const { Queue, Worker } = require("bullmq");
|
||||
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
|
||||
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||||
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||||
const graphQLClient = require("../../graphql-client/graphql-client").client;
|
||||
|
||||
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
|
||||
@@ -45,17 +46,19 @@ const buildNotificationContent = (notifications) => {
|
||||
*/
|
||||
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
if (!addQueue || !consolidateQueue) {
|
||||
logger.logger.debug("Initializing Notifications Queues");
|
||||
const prefix = getBullMQPrefix();
|
||||
|
||||
logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`);
|
||||
|
||||
addQueue = new Queue("notificationsAdd", {
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||||
});
|
||||
|
||||
consolidateQueue = new Queue("notificationsConsolidate", {
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||||
});
|
||||
|
||||
@@ -100,8 +103,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
}
|
||||
},
|
||||
{
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
concurrency: 5
|
||||
}
|
||||
);
|
||||
@@ -227,8 +230,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
}
|
||||
},
|
||||
{
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
concurrency: 1,
|
||||
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ const { sendTaskEmail } = require("../../email/sendemail");
|
||||
const generateEmailTemplate = require("../../email/generateTemplate");
|
||||
const { InstanceEndpoints } = require("../../utils/instanceMgr");
|
||||
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||||
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||||
|
||||
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
|
||||
const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS;
|
||||
@@ -34,19 +35,21 @@ let emailConsolidateWorker;
|
||||
*/
|
||||
const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
if (!emailAddQueue || !emailConsolidateQueue) {
|
||||
logger.logger.debug("Initializing Email Notification Queues");
|
||||
const prefix = getBullMQPrefix();
|
||||
|
||||
logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`);
|
||||
|
||||
// Queue for adding email notifications
|
||||
emailAddQueue = new Queue("emailAdd", {
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||||
});
|
||||
|
||||
// Queue for consolidating and sending emails
|
||||
emailConsolidateQueue = new Queue("emailConsolidate", {
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
|
||||
});
|
||||
|
||||
@@ -91,8 +94,8 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
}
|
||||
},
|
||||
{
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
concurrency: 5
|
||||
}
|
||||
);
|
||||
@@ -159,8 +162,8 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
}
|
||||
},
|
||||
{
|
||||
prefix,
|
||||
connection: pubClient,
|
||||
prefix: "{BULLMQ}",
|
||||
concurrency: 1,
|
||||
limiter: { max: 1, duration: RATE_LIMITER_DURATION }
|
||||
}
|
||||
|
||||
3
server/utils/getBullMQPrefix.js
Normal file
3
server/utils/getBullMQPrefix.js
Normal file
@@ -0,0 +1,3 @@
|
||||
const getBullMQPrefix = () => (process.env?.NODE_ENV === "production" ? "{PROD-BULLMQ}" : "{DEV-BULLMQ}");
|
||||
|
||||
module.exports = getBullMQPrefix;
|
||||
Reference in New Issue
Block a user