const { Queue, Worker } = require("bullmq"); const { sendTaskEmail } = require("../../email/sendemail"); const generateEmailTemplate = require("../../email/generateTemplate"); const { InstanceEndpoints } = require("../../utils/instanceMgr"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); const moment = require("moment-timezone"); const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS; const parsedValue = envValue ? parseInt(envValue, 10) : NaN; return isNaN(parsedValue) ? 3 : Math.max(1, parsedValue); // Default to 3, ensure at least 1 })(); // Base time-related constant (in milliseconds) / DO NOT TOUCH const EMAIL_CONSOLIDATION_DELAY = EMAIL_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout) // Derived time-related constants based on EMAIL_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to EMAIL_CONSOLIDATION_DELAY const CONSOLIDATION_KEY_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation) const LOCK_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration) const RATE_LIMITER_DURATION = EMAIL_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting) const NOTIFICATION_EXPIRATION = EMAIL_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (matches consolidation key expiration) let emailAddQueue; let emailConsolidateQueue; let emailAddWorker; let emailConsolidateWorker; /** * Initializes the email notification queues and workers. * * @param {Object} options - Configuration options for queue initialization. * @param {Object} options.pubClient - Redis client instance for queue communication. * @param {Object} options.logger - Logger instance for logging events and debugging. * @returns {Queue} The initialized `emailAddQueue` instance for dispatching notifications. */ const loadEmailQueue = async ({ pubClient, logger }) => { if (!emailAddQueue || !emailConsolidateQueue) { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; devDebugLogger(`Initializing Email Notification Queues with prefix: ${prefix}`); // Queue for adding email notifications emailAddQueue = new Queue("emailAdd", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Queue for consolidating and sending emails emailConsolidateQueue = new Queue("emailConsolidate", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); // Worker to process adding notifications emailAddWorker = new Worker( "emailAdd", async (job) => { const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = job.data; devDebugLogger(`Adding email notifications for jobId ${jobId}`); const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`; for (const recipient of recipients) { const { user, firstName, lastName } = recipient; const userKey = `${redisKeyPrefix}:${user}`; await pubClient.rpush(userKey, body); await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${user}`; await pubClient.hsetnx(detailsKey, "firstName", firstName || ""); await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); await pubClient.hsetnx(detailsKey, "bodyShopTimezone", bodyShopTimezone); await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user); devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`); } const consolidateKey = `email:${devKey}:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); if (flagSet) { await emailConsolidateQueue.add( "consolidate-emails", { jobId, jobRoNumber, bodyShopName, bodyShopTimezone }, { jobId: `consolidate:${jobId}`, delay: EMAIL_CONSOLIDATION_DELAY, attempts: 3, backoff: LOCK_EXPIRATION } ); devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); } else { devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`); } }, { prefix, connection: pubClient, concurrency: 5 } ); // Worker to consolidate and send emails emailConsolidateWorker = new Worker( "emailConsolidate", async (job) => { const { jobId, jobRoNumber, bodyShopName } = job.data; devDebugLogger(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); if (lockAcquired) { try { const recipientsSet = `email:${devKey}:recipients:${jobId}`; const recipients = await pubClient.smembers(recipientsSet); for (const recipient of recipients) { const userKey = `email:${devKey}:notifications:${jobId}:${recipient}`; const detailsKey = `email:${devKey}:recipientDetails:${jobId}:${recipient}`; const messages = await pubClient.lrange(userKey, 0, -1); if (messages.length > 0) { const details = await pubClient.hgetall(detailsKey); const firstName = details.firstName || "User"; const multipleUpdateString = messages.length > 1 ? "Updates" : "Update"; const subject = `${multipleUpdateString} for job ${jobRoNumber || "N/A"} at ${bodyShopName}`; const timezone = moment.tz.zone(details?.bodyShopTimezone) ? details.bodyShopTimezone : "UTC"; const emailBody = generateEmailTemplate({ header: `${multipleUpdateString} for Job ${jobRoNumber || "N/A"}`, subHeader: `Dear ${firstName},`, dateLine: moment().tz(timezone).format("MM/DD/YYYY hh:mm a"), body: `

There have been updates to job ${jobRoNumber || "N/A"} at ${bodyShopName}:

    ${messages.map((msg) => `
  • ${msg}
  • `).join("")}

Please check the job for more details.

` }); await sendTaskEmail({ to: recipient, subject, type: "html", html: emailBody }); devDebugLogger( `Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates` ); await pubClient.del(userKey); await pubClient.del(detailsKey); } } await pubClient.del(recipientsSet); await pubClient.del(`email:${devKey}:consolidate:${jobId}`); } catch (err) { logger.log(`email-queue-consolidation-error`, "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }); throw err; } finally { await pubClient.del(lockKey); } } else { devDebugLogger(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`); } }, { prefix, connection: pubClient, concurrency: 1, limiter: { max: 1, duration: RATE_LIMITER_DURATION } } ); // Event handlers for workers emailAddWorker.on("completed", (job) => devDebugLogger(`Email add job ${job.id} completed`)); emailConsolidateWorker.on("completed", (job) => devDebugLogger(`Email consolidate job ${job.id} completed`)); emailAddWorker.on("failed", (job, err) => logger.log(`add-email-queue-failed`, "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); emailConsolidateWorker.on("failed", (job, err) => logger.log(`email-consolidation-job-failed`, "ERROR", "notifications", "api", { message: err?.message, stack: err?.stack }) ); // Register cleanup task instead of direct process listeners const shutdown = async () => { devDebugLogger("Closing email queue workers..."); await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]); devDebugLogger("Email queue workers closed"); }; registerCleanupTask(shutdown); } return emailAddQueue; }; /** * Retrieves the initialized `emailAddQueue` instance. * * @returns {Queue} The `emailAddQueue` instance for adding notifications. * @throws {Error} If `emailAddQueue` is not initialized. */ const getQueue = () => { if (!emailAddQueue) { throw new Error("Email add queue not initialized. Ensure loadEmailQueue is called during bootstrap."); } return emailAddQueue; }; /** * Dispatches email notifications to the `emailAddQueue` for processing. * * @param {Object} options - Options for dispatching notifications. * @param {Array} options.emailsToDispatch - Array of email notification objects. * @param {Object} options.logger - Logger instance for logging dispatch events. * @returns {Promise} Resolves when all notifications are added to the queue. */ // eslint-disable-next-line no-unused-vars const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { const emailAddQueue = getQueue(); for (const email of emailsToDispatch) { const { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients } = email; if (!jobId || !jobRoNumber || !bodyShopName || !body || !recipients.length) { devDebugLogger( `Skipping email dispatch for jobId ${jobId} due to missing data: ` + `jobRoNumber=${jobRoNumber || "N/A"}, bodyShopName=${bodyShopName}, body=${body}, recipients=${recipients.length}` ); continue; } await emailAddQueue.add( "add-email-notification", { jobId, jobRoNumber, bodyShopName, bodyShopTimezone, body, recipients }, { jobId: `${jobId}:${Date.now()}` } ); devDebugLogger(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; module.exports = { loadEmailQueue, getQueue, dispatchEmailsToQueue };