From f4a3b75a863d28afc9e0aff9454bbf3961ff6d72 Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Fri, 28 Feb 2025 17:33:46 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications - Checkpoint - Header finalized, scenarioParser now uses ENV var for FILTER_SELF from watchers. --- server.js | 41 +++--------------- server/notifications/queues/appQueue.js | 5 ++- server/notifications/queues/emailQueue.js | 21 +++++---- server/utils/cleanupManager.js | 52 +++++++++++++++++++++++ 4 files changed, 70 insertions(+), 49 deletions(-) create mode 100644 server/utils/cleanupManager.js diff --git a/server.js b/server.js index ac80f6576..ca126a917 100644 --- a/server.js +++ b/server.js @@ -31,11 +31,11 @@ const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents"); const { ElastiCacheClient, DescribeCacheClustersCommand } = require("@aws-sdk/client-elasticache"); const { InstanceRegion } = require("./server/utils/instanceMgr"); const StartStatusReporter = require("./server/utils/statusReporter"); +const { registerCleanupTask, initializeCleanupManager } = require("./server/utils/cleanupManager"); + const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); const { loadAppQueue } = require("./server/notifications/queues/appQueue"); -const cleanupTasks = []; -let isShuttingDown = false; const CLUSTER_RETRY_BASE_DELAY = 100; const CLUSTER_RETRY_MAX_DELAY = 5000; const CLUSTER_RETRY_JITTER = 100; @@ -332,6 +332,9 @@ const main = async () => { const server = http.createServer(app); + // Initialize cleanup manager with signal handlers + initializeCleanupManager(); + const { pubClient, ioRedis } = await applySocketIO({ server, app }); const redisHelpers = applyRedisHelpers({ pubClient, app, logger }); const ioHelpers = applyIOHelpers({ app, redisHelpers, ioRedis, logger }); @@ -351,10 +354,6 @@ const main = async () => { StatusReporter.end(); }); - // Add SIGTERM signal handler - process.on("SIGTERM", handleSigterm); - process.on("SIGINT", handleSigterm); // Optional: Handle Ctrl+C - try { await server.listen(port); logger.log(`Server started on port ${port}`, "INFO", "api"); @@ -373,33 +372,3 @@ main().catch((error) => { // Note: If we want the app to crash on all uncaught async operations, we would // need to put a `process.exit(1);` here }); - -// Register a cleanup task -function registerCleanupTask(task) { - cleanupTasks.push(task); -} - -// SIGTERM handler -async function handleSigterm() { - if (isShuttingDown) { - logger.log("sigterm-api", "WARN", null, null, { message: "Shutdown already in progress, ignoring signal." }); - return; - } - - isShuttingDown = true; - - logger.log("sigterm-api", "WARN", null, null, { message: "SIGTERM Received. Starting graceful shutdown." }); - - try { - for (const task of cleanupTasks) { - logger.log("sigterm-api", "WARN", null, null, { message: `Running cleanup task: ${task.name}` }); - - await task(); - } - logger.log("sigterm-api", "WARN", null, null, { message: `All cleanup tasks completed.` }); - } catch (error) { - logger.log("sigterm-api-error", "ERROR", null, null, { message: error.message, stack: error.stack }); - } - - process.exit(0); -} diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index a2e4f609b..b595380aa 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -1,5 +1,6 @@ const { Queue, Worker } = require("bullmq"); const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries"); +const { registerCleanupTask } = require("../../utils/cleanupManager"); const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 @@ -239,14 +240,14 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) ); + // Register cleanup task instead of direct process listeners const shutdown = async () => { logger.logger.info("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); logger.logger.info("App queue workers closed"); }; - process.on("SIGTERM", shutdown); - process.on("SIGINT", shutdown); + registerCleanupTask(shutdown); } return addQueue; diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index ef34196cb..231c3679b 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -2,6 +2,7 @@ const { Queue, Worker } = require("bullmq"); const { sendTaskEmail } = require("../../email/sendemail"); const generateEmailTemplate = require("../../email/generateTemplate"); const { InstanceEndpoints } = require("../../utils/instanceMgr"); +const { registerCleanupTask } = require("../../utils/cleanupManager"); const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.APP_CONSOLIDATION_DELAY_IN_MINS; @@ -61,11 +62,11 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const { user, firstName, lastName } = recipient; const userKey = `${redisKeyPrefix}:${user}`; await pubClient.rpush(userKey, body); - await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); // Set expiration + await pubClient.expire(userKey, NOTIFICATION_EXPIRATION / 1000); const detailsKey = `email:recipientDetails:${jobId}:${user}`; await pubClient.hsetnx(detailsKey, "firstName", firstName || ""); await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); - await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); // Set expiration + await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); await pubClient.sadd(`email:recipients:${jobId}`, user); logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`); } @@ -79,12 +80,12 @@ const loadEmailQueue = async ({ pubClient, logger }) => { { jobId: `consolidate:${jobId}`, delay: EMAIL_CONSOLIDATION_DELAY, - attempts: 3, // Retry up to 3 times - backoff: LOCK_EXPIRATION // Retry delay matches lock expiration (15s) + attempts: 3, + backoff: LOCK_EXPIRATION } ); logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`); - await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); // Convert to seconds + await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); } else { logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`); } @@ -104,7 +105,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { logger.logger.info(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:emailConsolidate:${jobId}`; - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); // Convert to seconds + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); if (lockAcquired) { try { const recipientsSet = `email:recipients:${jobId}`; @@ -118,7 +119,6 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const firstName = details.firstName || "User"; const multipleUpdateString = messages.length > 1 ? "Updates" : "Update"; const subject = `${multipleUpdateString} for job ${jobRoNumber} at ${bodyShopName}`; - // Use the template instead of inline HTML const emailBody = generateEmailTemplate({ header: `${multipleUpdateString} for Job ${jobRoNumber}`, subHeader: `Dear ${firstName},`, @@ -147,7 +147,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { await pubClient.del(`email:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); - throw err; // Trigger retry if attempts remain + throw err; } finally { await pubClient.del(lockKey); } @@ -173,14 +173,13 @@ const loadEmailQueue = async ({ pubClient, logger }) => { logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err }) ); - // Graceful shutdown + // Register cleanup task instead of direct process listeners const shutdown = async () => { logger.logger.info("Closing email queue workers..."); await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]); logger.logger.info("Email queue workers closed"); }; - process.on("SIGTERM", shutdown); - process.on("SIGINT", shutdown); + registerCleanupTask(shutdown); } return emailAddQueue; diff --git a/server/utils/cleanupManager.js b/server/utils/cleanupManager.js new file mode 100644 index 000000000..e2923b46a --- /dev/null +++ b/server/utils/cleanupManager.js @@ -0,0 +1,52 @@ +// server/utils/cleanupManager.js +const logger = require("./logger"); + +let cleanupTasks = []; +let isShuttingDown = false; + +/** + * Register a cleanup task to be executed during shutdown + * @param {Function} task - The cleanup task to register + */ +function registerCleanupTask(task) { + cleanupTasks.push(task); +} + +/** + * Handle SIGTERM signal for graceful shutdown + */ +async function handleSigterm() { + if (isShuttingDown) { + logger.log("sigterm-api", "WARN", null, null, { message: "Shutdown already in progress, ignoring signal." }); + return; + } + + isShuttingDown = true; + + logger.log("sigterm-api", "WARN", null, null, { message: "SIGTERM Received. Starting graceful shutdown." }); + + try { + for (const task of cleanupTasks) { + logger.log("sigterm-api", "WARN", null, null, { message: `Running cleanup task: ${task.name}` }); + await task(); + } + logger.log("sigterm-api", "WARN", null, null, { message: `All cleanup tasks completed.` }); + } catch (error) { + logger.log("sigterm-api-error", "ERROR", null, null, { message: error.message, stack: error.stack }); + } + + process.exit(0); +} + +/** + * Initialize cleanup manager with process event listeners + */ +function initializeCleanupManager() { + process.on("SIGTERM", handleSigterm); + process.on("SIGINT", handleSigterm); // Handle Ctrl+C +} + +module.exports = { + registerCleanupTask, + initializeCleanupManager +};