diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index fcc8b9e30..9bf35de54 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -2,6 +2,7 @@ const { Queue, Worker } = require("bullmq"); const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); +const devDebugLogger = require("../../utils/devDebugLogger"); const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 @@ -49,7 +50,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; - logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`); + devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`); addQueue = new Queue("notificationsAdd", { prefix, @@ -67,7 +68,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { "notificationsAdd", async (job) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; - logger.logger.debug(`Adding notifications for jobId ${jobId}`); + devDebugLogger(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; @@ -79,12 +80,12 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); - logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); + devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } const consolidateKey = `app:${devKey}:consolidate:${jobId}`; const flagSet = await pubClient.setnx(consolidateKey, "pending"); - logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); + devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { await consolidateQueue.add( @@ -97,10 +98,10 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { backoff: LOCK_EXPIRATION } ); - logger.logger.debug(`Scheduled consolidation for jobId ${jobId}`); + devDebugLogger(`Scheduled consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { - logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); + devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`); } }, { @@ -114,24 +115,24 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { "notificationsConsolidate", async (job) => { const { jobId, recipients } = job.data; - logger.logger.debug(`Consolidating notifications for jobId ${jobId}`); + devDebugLogger(`Consolidating notifications for jobId ${jobId}`); const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`; const lockKey = `lock:${devKey}:consolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); - logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); + devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { try { const allNotifications = {}; const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; - logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); + devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`); for (const user of uniqueUsers) { const userKey = `${redisKeyPrefix}:${user}`; const notifications = await pubClient.get(userKey); - logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`); + devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`); if (notifications) { const parsedNotifications = JSON.parse(notifications); @@ -141,13 +142,13 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { allNotifications[user][bodyShopId] = parsedNotifications; } await pubClient.del(userKey); - logger.logger.debug(`Deleted Redis key ${userKey}`); + devDebugLogger(`Deleted Redis key ${userKey}`); } else { - logger.logger.debug(`No notifications found for ${user} under ${userKey}`); + devDebugLogger(`No notifications found for ${user} under ${userKey}`); } } - logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); + devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); // Insert notifications into the database and collect IDs const notificationInserts = []; @@ -174,7 +175,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: notificationInserts }); - logger.logger.debug( + devDebugLogger( `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` ); @@ -208,11 +209,11 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { associationId }); }); - logger.logger.debug( + devDebugLogger( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` ); } else { - logger.logger.debug(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); + devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); } } } @@ -228,7 +229,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { await pubClient.del(lockKey); } } else { - logger.logger.debug(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); + devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } }, { @@ -239,8 +240,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } ); - addWorker.on("completed", (job) => logger.logger.debug(`Add job ${job.id} completed`)); - consolidateWorker.on("completed", (job) => logger.logger.debug(`Consolidate job ${job.id} completed`)); + addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`)); + consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`)); + addWorker.on("failed", (job, err) => logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", { message: err?.message, @@ -256,9 +258,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { // Register cleanup task instead of direct process listeners const shutdown = async () => { - logger.logger.debug("Closing app queue workers..."); + devDebugLogger("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); - logger.logger.debug("App queue workers closed"); + devDebugLogger("App queue workers closed"); }; registerCleanupTask(shutdown); @@ -288,7 +290,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber }, { jobId: `${jobId}:${Date.now()}` } ); - logger.logger.debug(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); + devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index cd0275b4e..823dff5ea 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -4,6 +4,7 @@ const generateEmailTemplate = require("../../email/generateTemplate"); const { InstanceEndpoints } = require("../../utils/instanceMgr"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); +const devDebugLogger = require("../../utils/devDebugLogger"); const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => { const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS; @@ -38,7 +39,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { const prefix = getBullMQPrefix(); const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev"; - logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`); + devDebugLogger(`Initializing Email Notification Queues with prefix: ${prefix}`); // Queue for adding email notifications emailAddQueue = new Queue("emailAdd", { @@ -59,7 +60,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { "emailAdd", async (job) => { const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; - logger.logger.debug(`Adding email notifications for jobId ${jobId}`); + devDebugLogger(`Adding email notifications for jobId ${jobId}`); const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`; @@ -73,7 +74,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { await pubClient.hsetnx(detailsKey, "lastName", lastName || ""); await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000); await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user); - logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`); + devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`); } const consolidateKey = `email:${devKey}:consolidate:${jobId}`; @@ -89,10 +90,10 @@ const loadEmailQueue = async ({ pubClient, logger }) => { backoff: LOCK_EXPIRATION } ); - logger.logger.debug(`Scheduled email consolidation for jobId ${jobId}`); + devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); } else { - logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`); + devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`); } }, { @@ -107,7 +108,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { "emailConsolidate", async (job) => { const { jobId, jobRoNumber, bodyShopName } = job.data; - logger.logger.debug(`Consolidating emails for jobId ${jobId}`); + devDebugLogger(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); @@ -141,7 +142,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { type: "html", html: emailBody }); - logger.logger.debug( + devDebugLogger( `Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates` ); await pubClient.del(userKey); @@ -160,7 +161,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { await pubClient.del(lockKey); } } else { - logger.logger.debug(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`); + devDebugLogger(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`); } }, { @@ -172,8 +173,9 @@ const loadEmailQueue = async ({ pubClient, logger }) => { ); // Event handlers for workers - emailAddWorker.on("completed", (job) => logger.logger.debug(`Email add job ${job.id} completed`)); - emailConsolidateWorker.on("completed", (job) => logger.logger.debug(`Email consolidate job ${job.id} completed`)); + emailAddWorker.on("completed", (job) => devDebugLogger(`Email add job ${job.id} completed`)); + emailConsolidateWorker.on("completed", (job) => devDebugLogger(`Email consolidate job ${job.id} completed`)); + emailAddWorker.on("failed", (job, err) => logger.log(`add-email-queue-failed`, "ERROR", "notifications", "api", { message: err?.message, @@ -189,9 +191,9 @@ const loadEmailQueue = async ({ pubClient, logger }) => { // Register cleanup task instead of direct process listeners const shutdown = async () => { - logger.logger.debug("Closing email queue workers..."); + devDebugLogger("Closing email queue workers..."); await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]); - logger.logger.debug("Email queue workers closed"); + devDebugLogger("Email queue workers closed"); }; registerCleanupTask(shutdown); } @@ -239,7 +241,7 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { { jobId, jobRoNumber, bodyShopName, body, recipients }, { jobId: `${jobId}:${Date.now()}` } ); - logger.logger.debug(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`); + devDebugLogger(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/utils/devDebugLogger.js b/server/utils/devDebugLogger.js new file mode 100644 index 000000000..fad0a05fa --- /dev/null +++ b/server/utils/devDebugLogger.js @@ -0,0 +1,10 @@ +const logger = require("./logger"); + +const devDebugLogger = (message, meta) => { + if (process.env?.NODE_ENV === "production") { + return; + } + logger.logger.debug(message, meta); +}; + +module.exports = devDebugLogger; diff --git a/server/utils/redisHelpers.js b/server/utils/redisHelpers.js index 73349c2cd..593ce13a6 100644 --- a/server/utils/redisHelpers.js +++ b/server/utils/redisHelpers.js @@ -1,4 +1,5 @@ const { GET_BODYSHOP_BY_ID } = require("../graphql-client/queries"); +const devDebugLogger = require("./devDebugLogger"); const client = require("../graphql-client/graphql-client").client; const BODYSHOP_CACHE_TTL = 3600; // 1 hour @@ -87,7 +88,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { const addUserSocketMapping = async (email, socketId, bodyshopId) => { const socketMappingKey = getUserSocketMappingKey(email); try { - logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis"); + devDebugLogger(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`); // Save the mapping: socketId -> bodyshopId await pubClient.hset(socketMappingKey, socketId, bodyshopId); // Set TTL (24 hours) for the mapping hash @@ -109,7 +110,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { const exists = await pubClient.exists(socketMappingKey); if (exists) { await pubClient.expire(socketMappingKey, 86400); - logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis"); + devDebugLogger(`Refreshed TTL for ${email} socket mapping`); } } catch (error) { logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis"); @@ -126,20 +127,16 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { const socketMappingKey = getUserSocketMappingKey(email); try { - logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis"); + devDebugLogger(`Removing socket ${socketId} mapping for user ${email}`); // Look up the bodyshopId associated with this socket const bodyshopId = await pubClient.hget(socketMappingKey, socketId); if (!bodyshopId) { - logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis"); + devDebugLogger(`Socket ${socketId} not found for user ${email}`); return; } // Remove the socket mapping await pubClient.hdel(socketMappingKey, socketId); - logger.log( - `Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`, - "DEBUG", - "redis" - ); + devDebugLogger(`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`); // Refresh TTL if any socket mappings remain const remainingSockets = await pubClient.hlen(socketMappingKey); @@ -227,7 +224,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { await pubClient.set(key, jsonData); await pubClient.expire(key, BODYSHOP_CACHE_TTL); - logger.log("bodyshop-cache-miss", "DEBUG", "redis", null, { + devDebugLogger("bodyshop-cache-miss", { bodyshopId, action: "Fetched from DB and cached" }); @@ -254,7 +251,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { if (!values) { // Invalidate cache by deleting the key await pubClient.del(key); - logger.log("bodyshop-cache-invalidate", "DEBUG", "api", "redis", { + devDebugLogger("bodyshop-cache-invalidate", { bodyshopId, action: "Cache invalidated" }); @@ -263,7 +260,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { const jsonData = JSON.stringify(values); await pubClient.set(key, jsonData); await pubClient.expire(key, BODYSHOP_CACHE_TTL); - logger.log("bodyshop-cache-update", "DEBUG", "api", "redis", { + devDebugLogger("bodyshop-cache-update", { bodyshopId, action: "Cache updated", values