Merged in feature/IO-3096-GlobalNotifications (pull request #2205)
IO-3096-GlobalNotifications - Add in a function to exclude extra logging from production
This commit is contained in:
@@ -2,6 +2,7 @@ const { Queue, Worker } = require("bullmq");
|
||||
const { INSERT_NOTIFICATIONS_MUTATION } = require("../../graphql-client/queries");
|
||||
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||||
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||||
const devDebugLogger = require("../../utils/devDebugLogger");
|
||||
const graphQLClient = require("../../graphql-client/graphql-client").client;
|
||||
|
||||
// Base time-related constant in minutes, sourced from environment variable or defaulting to 1
|
||||
@@ -49,7 +50,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
const prefix = getBullMQPrefix();
|
||||
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
|
||||
|
||||
logger.logger.debug(`Initializing Notifications Queues with prefix: ${prefix}`);
|
||||
devDebugLogger(`Initializing Notifications Queues with prefix: ${prefix}`);
|
||||
|
||||
addQueue = new Queue("notificationsAdd", {
|
||||
prefix,
|
||||
@@ -67,7 +68,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
"notificationsAdd",
|
||||
async (job) => {
|
||||
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
|
||||
logger.logger.debug(`Adding notifications for jobId ${jobId}`);
|
||||
devDebugLogger(`Adding notifications for jobId ${jobId}`);
|
||||
|
||||
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
|
||||
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
|
||||
@@ -79,12 +80,12 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
|
||||
notifications.push(notification);
|
||||
await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000);
|
||||
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
|
||||
devDebugLogger(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
|
||||
}
|
||||
|
||||
const consolidateKey = `app:${devKey}:consolidate:${jobId}`;
|
||||
const flagSet = await pubClient.setnx(consolidateKey, "pending");
|
||||
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
|
||||
devDebugLogger(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
|
||||
|
||||
if (flagSet) {
|
||||
await consolidateQueue.add(
|
||||
@@ -97,10 +98,10 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
backoff: LOCK_EXPIRATION
|
||||
}
|
||||
);
|
||||
logger.logger.debug(`Scheduled consolidation for jobId ${jobId}`);
|
||||
devDebugLogger(`Scheduled consolidation for jobId ${jobId}`);
|
||||
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
|
||||
} else {
|
||||
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
|
||||
devDebugLogger(`Consolidation already scheduled for jobId ${jobId}`);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -114,24 +115,24 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
"notificationsConsolidate",
|
||||
async (job) => {
|
||||
const { jobId, recipients } = job.data;
|
||||
logger.logger.debug(`Consolidating notifications for jobId ${jobId}`);
|
||||
devDebugLogger(`Consolidating notifications for jobId ${jobId}`);
|
||||
|
||||
const redisKeyPrefix = `app:${devKey}:notifications:${jobId}`;
|
||||
const lockKey = `lock:${devKey}:consolidate:${jobId}`;
|
||||
|
||||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
|
||||
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
|
||||
devDebugLogger(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
|
||||
|
||||
if (lockAcquired) {
|
||||
try {
|
||||
const allNotifications = {};
|
||||
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
|
||||
logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
|
||||
devDebugLogger(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
|
||||
|
||||
for (const user of uniqueUsers) {
|
||||
const userKey = `${redisKeyPrefix}:${user}`;
|
||||
const notifications = await pubClient.get(userKey);
|
||||
logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`);
|
||||
devDebugLogger(`Retrieved notifications for ${user}: ${notifications}`);
|
||||
|
||||
if (notifications) {
|
||||
const parsedNotifications = JSON.parse(notifications);
|
||||
@@ -141,13 +142,13 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
allNotifications[user][bodyShopId] = parsedNotifications;
|
||||
}
|
||||
await pubClient.del(userKey);
|
||||
logger.logger.debug(`Deleted Redis key ${userKey}`);
|
||||
devDebugLogger(`Deleted Redis key ${userKey}`);
|
||||
} else {
|
||||
logger.logger.debug(`No notifications found for ${user} under ${userKey}`);
|
||||
devDebugLogger(`No notifications found for ${user} under ${userKey}`);
|
||||
}
|
||||
}
|
||||
|
||||
logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
|
||||
devDebugLogger(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
|
||||
|
||||
// Insert notifications into the database and collect IDs
|
||||
const notificationInserts = [];
|
||||
@@ -174,7 +175,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
|
||||
objects: notificationInserts
|
||||
});
|
||||
logger.logger.debug(
|
||||
devDebugLogger(
|
||||
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
|
||||
);
|
||||
|
||||
@@ -208,11 +209,11 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
associationId
|
||||
});
|
||||
});
|
||||
logger.logger.debug(
|
||||
devDebugLogger(
|
||||
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
|
||||
);
|
||||
} else {
|
||||
logger.logger.debug(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
|
||||
devDebugLogger(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -228,7 +229,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
await pubClient.del(lockKey);
|
||||
}
|
||||
} else {
|
||||
logger.logger.debug(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
||||
devDebugLogger(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -239,8 +240,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
}
|
||||
);
|
||||
|
||||
addWorker.on("completed", (job) => logger.logger.debug(`Add job ${job.id} completed`));
|
||||
consolidateWorker.on("completed", (job) => logger.logger.debug(`Consolidate job ${job.id} completed`));
|
||||
addWorker.on("completed", (job) => devDebugLogger(`Add job ${job.id} completed`));
|
||||
consolidateWorker.on("completed", (job) => devDebugLogger(`Consolidate job ${job.id} completed`));
|
||||
|
||||
addWorker.on("failed", (job, err) =>
|
||||
logger.log(`app-queue-notification-error`, "ERROR", "notifications", "api", {
|
||||
message: err?.message,
|
||||
@@ -256,9 +258,9 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||
|
||||
// Register cleanup task instead of direct process listeners
|
||||
const shutdown = async () => {
|
||||
logger.logger.debug("Closing app queue workers...");
|
||||
devDebugLogger("Closing app queue workers...");
|
||||
await Promise.all([addWorker.close(), consolidateWorker.close()]);
|
||||
logger.logger.debug("App queue workers closed");
|
||||
devDebugLogger("App queue workers closed");
|
||||
};
|
||||
|
||||
registerCleanupTask(shutdown);
|
||||
@@ -288,7 +290,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
|
||||
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
|
||||
{ jobId: `${jobId}:${Date.now()}` }
|
||||
);
|
||||
logger.logger.debug(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||
devDebugLogger(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ const generateEmailTemplate = require("../../email/generateTemplate");
|
||||
const { InstanceEndpoints } = require("../../utils/instanceMgr");
|
||||
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||||
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||||
const devDebugLogger = require("../../utils/devDebugLogger");
|
||||
|
||||
const EMAIL_CONSOLIDATION_DELAY_IN_MINS = (() => {
|
||||
const envValue = process.env?.EMAIL_CONSOLIDATION_DELAY_IN_MINS;
|
||||
@@ -38,7 +39,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
const prefix = getBullMQPrefix();
|
||||
const devKey = process.env?.NODE_ENV === "production" ? "prod" : "dev";
|
||||
|
||||
logger.logger.debug(`Initializing Email Notification Queues with prefix: ${prefix}`);
|
||||
devDebugLogger(`Initializing Email Notification Queues with prefix: ${prefix}`);
|
||||
|
||||
// Queue for adding email notifications
|
||||
emailAddQueue = new Queue("emailAdd", {
|
||||
@@ -59,7 +60,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
"emailAdd",
|
||||
async (job) => {
|
||||
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
|
||||
logger.logger.debug(`Adding email notifications for jobId ${jobId}`);
|
||||
devDebugLogger(`Adding email notifications for jobId ${jobId}`);
|
||||
|
||||
const redisKeyPrefix = `email:${devKey}:notifications:${jobId}`;
|
||||
|
||||
@@ -73,7 +74,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
await pubClient.hsetnx(detailsKey, "lastName", lastName || "");
|
||||
await pubClient.expire(detailsKey, NOTIFICATION_EXPIRATION / 1000);
|
||||
await pubClient.sadd(`email:${devKey}:recipients:${jobId}`, user);
|
||||
logger.logger.debug(`Stored message for ${user} under ${userKey}: ${body}`);
|
||||
devDebugLogger(`Stored message for ${user} under ${userKey}: ${body}`);
|
||||
}
|
||||
|
||||
const consolidateKey = `email:${devKey}:consolidate:${jobId}`;
|
||||
@@ -89,10 +90,10 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
backoff: LOCK_EXPIRATION
|
||||
}
|
||||
);
|
||||
logger.logger.debug(`Scheduled email consolidation for jobId ${jobId}`);
|
||||
devDebugLogger(`Scheduled email consolidation for jobId ${jobId}`);
|
||||
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
|
||||
} else {
|
||||
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
|
||||
devDebugLogger(`Email consolidation already scheduled for jobId ${jobId}`);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -107,7 +108,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
"emailConsolidate",
|
||||
async (job) => {
|
||||
const { jobId, jobRoNumber, bodyShopName } = job.data;
|
||||
logger.logger.debug(`Consolidating emails for jobId ${jobId}`);
|
||||
devDebugLogger(`Consolidating emails for jobId ${jobId}`);
|
||||
|
||||
const lockKey = `lock:${devKey}:emailConsolidate:${jobId}`;
|
||||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
|
||||
@@ -141,7 +142,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
type: "html",
|
||||
html: emailBody
|
||||
});
|
||||
logger.logger.debug(
|
||||
devDebugLogger(
|
||||
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
|
||||
);
|
||||
await pubClient.del(userKey);
|
||||
@@ -160,7 +161,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
await pubClient.del(lockKey);
|
||||
}
|
||||
} else {
|
||||
logger.logger.debug(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
|
||||
devDebugLogger(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -172,8 +173,9 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
);
|
||||
|
||||
// Event handlers for workers
|
||||
emailAddWorker.on("completed", (job) => logger.logger.debug(`Email add job ${job.id} completed`));
|
||||
emailConsolidateWorker.on("completed", (job) => logger.logger.debug(`Email consolidate job ${job.id} completed`));
|
||||
emailAddWorker.on("completed", (job) => devDebugLogger(`Email add job ${job.id} completed`));
|
||||
emailConsolidateWorker.on("completed", (job) => devDebugLogger(`Email consolidate job ${job.id} completed`));
|
||||
|
||||
emailAddWorker.on("failed", (job, err) =>
|
||||
logger.log(`add-email-queue-failed`, "ERROR", "notifications", "api", {
|
||||
message: err?.message,
|
||||
@@ -189,9 +191,9 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||
|
||||
// Register cleanup task instead of direct process listeners
|
||||
const shutdown = async () => {
|
||||
logger.logger.debug("Closing email queue workers...");
|
||||
devDebugLogger("Closing email queue workers...");
|
||||
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
|
||||
logger.logger.debug("Email queue workers closed");
|
||||
devDebugLogger("Email queue workers closed");
|
||||
};
|
||||
registerCleanupTask(shutdown);
|
||||
}
|
||||
@@ -239,7 +241,7 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
|
||||
{ jobId, jobRoNumber, bodyShopName, body, recipients },
|
||||
{ jobId: `${jobId}:${Date.now()}` }
|
||||
);
|
||||
logger.logger.debug(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||
devDebugLogger(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
10
server/utils/devDebugLogger.js
Normal file
10
server/utils/devDebugLogger.js
Normal file
@@ -0,0 +1,10 @@
|
||||
const logger = require("./logger");
|
||||
|
||||
const devDebugLogger = (message, meta) => {
|
||||
if (process.env?.NODE_ENV === "production") {
|
||||
return;
|
||||
}
|
||||
logger.logger.debug(message, meta);
|
||||
};
|
||||
|
||||
module.exports = devDebugLogger;
|
||||
@@ -1,4 +1,5 @@
|
||||
const { GET_BODYSHOP_BY_ID } = require("../graphql-client/queries");
|
||||
const devDebugLogger = require("./devDebugLogger");
|
||||
const client = require("../graphql-client/graphql-client").client;
|
||||
|
||||
const BODYSHOP_CACHE_TTL = 3600; // 1 hour
|
||||
@@ -87,7 +88,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
|
||||
const socketMappingKey = getUserSocketMappingKey(email);
|
||||
try {
|
||||
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
|
||||
devDebugLogger(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`);
|
||||
// Save the mapping: socketId -> bodyshopId
|
||||
await pubClient.hset(socketMappingKey, socketId, bodyshopId);
|
||||
// Set TTL (24 hours) for the mapping hash
|
||||
@@ -109,7 +110,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
const exists = await pubClient.exists(socketMappingKey);
|
||||
if (exists) {
|
||||
await pubClient.expire(socketMappingKey, 86400);
|
||||
logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis");
|
||||
devDebugLogger(`Refreshed TTL for ${email} socket mapping`);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis");
|
||||
@@ -126,20 +127,16 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
const socketMappingKey = getUserSocketMappingKey(email);
|
||||
|
||||
try {
|
||||
logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis");
|
||||
devDebugLogger(`Removing socket ${socketId} mapping for user ${email}`);
|
||||
// Look up the bodyshopId associated with this socket
|
||||
const bodyshopId = await pubClient.hget(socketMappingKey, socketId);
|
||||
if (!bodyshopId) {
|
||||
logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis");
|
||||
devDebugLogger(`Socket ${socketId} not found for user ${email}`);
|
||||
return;
|
||||
}
|
||||
// Remove the socket mapping
|
||||
await pubClient.hdel(socketMappingKey, socketId);
|
||||
logger.log(
|
||||
`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`,
|
||||
"DEBUG",
|
||||
"redis"
|
||||
);
|
||||
devDebugLogger(`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`);
|
||||
|
||||
// Refresh TTL if any socket mappings remain
|
||||
const remainingSockets = await pubClient.hlen(socketMappingKey);
|
||||
@@ -227,7 +224,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
await pubClient.set(key, jsonData);
|
||||
await pubClient.expire(key, BODYSHOP_CACHE_TTL);
|
||||
|
||||
logger.log("bodyshop-cache-miss", "DEBUG", "redis", null, {
|
||||
devDebugLogger("bodyshop-cache-miss", {
|
||||
bodyshopId,
|
||||
action: "Fetched from DB and cached"
|
||||
});
|
||||
@@ -254,7 +251,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
if (!values) {
|
||||
// Invalidate cache by deleting the key
|
||||
await pubClient.del(key);
|
||||
logger.log("bodyshop-cache-invalidate", "DEBUG", "api", "redis", {
|
||||
devDebugLogger("bodyshop-cache-invalidate", {
|
||||
bodyshopId,
|
||||
action: "Cache invalidated"
|
||||
});
|
||||
@@ -263,7 +260,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
const jsonData = JSON.stringify(values);
|
||||
await pubClient.set(key, jsonData);
|
||||
await pubClient.expire(key, BODYSHOP_CACHE_TTL);
|
||||
logger.log("bodyshop-cache-update", "DEBUG", "api", "redis", {
|
||||
devDebugLogger("bodyshop-cache-update", {
|
||||
bodyshopId,
|
||||
action: "Cache updated",
|
||||
values
|
||||
|
||||
Reference in New Issue
Block a user