diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index b595380aa..890ce73f2 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -45,7 +45,7 @@ const buildNotificationContent = (notifications) => { */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (!addQueue || !consolidateQueue) { - logger.logger.info("Initializing Notifications Queues"); + logger.logger.debug("Initializing Notifications Queues"); addQueue = new Queue("notificationsAdd", { connection: pubClient, @@ -63,7 +63,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { "notificationsAdd", async (job) => { const { jobId, key, variables, recipients, body, jobRoNumber } = job.data; - logger.logger.info(`Adding notifications for jobId ${jobId}`); + logger.logger.debug(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() }; @@ -93,7 +93,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { backoff: LOCK_EXPIRATION } ); - logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); + logger.logger.debug(`Scheduled consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); @@ -110,7 +110,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { "notificationsConsolidate", async (job) => { const { jobId, recipients } = job.data; - logger.logger.info(`Consolidating notifications for jobId ${jobId}`); + logger.logger.debug(`Consolidating notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; const lockKey = `lock:consolidate:${jobId}`; @@ -169,7 +169,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { objects: notificationInserts }); - logger.logger.info( + logger.logger.debug( `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` ); @@ -203,7 +203,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { associationId }); }); - logger.logger.info( + logger.logger.debug( `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` ); } else { @@ -214,13 +214,16 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { await pubClient.del(`app:consolidate:${jobId}`); } catch (err) { - logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); + logger.log(`Consolidation error for jobId ${jobId}`, "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }); throw err; } finally { await pubClient.del(lockKey); } } else { - logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); + logger.logger.debug(`Skipped consolidation for jobId ${jobId} - lock held by another worker`); } }, { @@ -231,20 +234,26 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } ); - addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`)); - consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`)); + addWorker.on("completed", (job) => logger.logger.debug(`Add job ${job.id} completed`)); + consolidateWorker.on("completed", (job) => logger.logger.debug(`Consolidate job ${job.id} completed`)); addWorker.on("failed", (job, err) => - logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err }) + logger.log(`Add job ${job.id} failed:`, "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }) ); consolidateWorker.on("failed", (job, err) => - logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) + logger.log(`Consolidate job ${job.id} failed:`, "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }) ); // Register cleanup task instead of direct process listeners const shutdown = async () => { - logger.logger.info("Closing app queue workers..."); + logger.logger.debug("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); - logger.logger.info("App queue workers closed"); + logger.logger.debug("App queue workers closed"); }; registerCleanupTask(shutdown); @@ -274,7 +283,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { { jobId, bodyShopId, key, variables, recipients, body, jobRoNumber }, { jobId: `${jobId}:${Date.now()}` } ); - logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); + logger.logger.debug(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } }; diff --git a/server/notifications/queues/emailQueue.js b/server/notifications/queues/emailQueue.js index 231c3679b..22814e6a7 100644 --- a/server/notifications/queues/emailQueue.js +++ b/server/notifications/queues/emailQueue.js @@ -34,7 +34,7 @@ let emailConsolidateWorker; */ const loadEmailQueue = async ({ pubClient, logger }) => { if (!emailAddQueue || !emailConsolidateQueue) { - logger.logger.info("Initializing Email Notification Queues"); + logger.logger.debug("Initializing Email Notification Queues"); // Queue for adding email notifications emailAddQueue = new Queue("emailAdd", { @@ -55,7 +55,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { "emailAdd", async (job) => { const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data; - logger.logger.info(`Adding email notifications for jobId ${jobId}`); + logger.logger.debug(`Adding email notifications for jobId ${jobId}`); const redisKeyPrefix = `email:notifications:${jobId}`; for (const recipient of recipients) { @@ -84,7 +84,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { backoff: LOCK_EXPIRATION } ); - logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`); + logger.logger.debug(`Scheduled email consolidation for jobId ${jobId}`); await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000); } else { logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`); @@ -102,7 +102,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { "emailConsolidate", async (job) => { const { jobId, jobRoNumber, bodyShopName } = job.data; - logger.logger.info(`Consolidating emails for jobId ${jobId}`); + logger.logger.debug(`Consolidating emails for jobId ${jobId}`); const lockKey = `lock:emailConsolidate:${jobId}`; const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); @@ -136,7 +136,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => { type: "html", html: emailBody }); - logger.logger.info( + logger.logger.debug( `Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates` ); await pubClient.del(userKey); @@ -146,13 +146,16 @@ const loadEmailQueue = async ({ pubClient, logger }) => { await pubClient.del(recipientsSet); await pubClient.del(`email:consolidate:${jobId}`); } catch (err) { - logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); + logger.log(`Email Consolidation error for jobId ${jobId}`, "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }); throw err; } finally { await pubClient.del(lockKey); } } else { - logger.logger.info(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`); + logger.logger.debug(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`); } }, { @@ -164,20 +167,26 @@ const loadEmailQueue = async ({ pubClient, logger }) => { ); // Event handlers for workers - emailAddWorker.on("completed", (job) => logger.logger.info(`Email add job ${job.id} completed`)); - emailConsolidateWorker.on("completed", (job) => logger.logger.info(`Email consolidate job ${job.id} completed`)); + emailAddWorker.on("completed", (job) => logger.logger.debug(`Email add job ${job.id} completed`)); + emailConsolidateWorker.on("completed", (job) => logger.logger.debug(`Email consolidate job ${job.id} completed`)); emailAddWorker.on("failed", (job, err) => - logger.logger.error(`Email add job ${job.id} failed: ${err.message}`, { error: err }) + logger.log(`Email add job ${job.id} failed`, "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }) ); emailConsolidateWorker.on("failed", (job, err) => - logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err }) + logger.log(`Email consolidate job ${job.id} failed:`, "ERROR", "notifications", "api", { + message: err?.message, + stack: err?.stack + }) ); // Register cleanup task instead of direct process listeners const shutdown = async () => { - logger.logger.info("Closing email queue workers..."); + logger.logger.debug("Closing email queue workers..."); await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]); - logger.logger.info("Email queue workers closed"); + logger.logger.debug("Email queue workers closed"); }; registerCleanupTask(shutdown); } @@ -225,7 +234,7 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => { { jobId, jobRoNumber, bodyShopName, body, recipients }, { jobId: `${jobId}:${Date.now()}` } ); - logger.logger.info(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`); + logger.logger.debug(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`); } };