feature/IO-3096-GlobalNotifications - Logging
This commit is contained in:
@@ -45,7 +45,7 @@ const buildNotificationContent = (notifications) => {
|
|||||||
*/
|
*/
|
||||||
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||||
if (!addQueue || !consolidateQueue) {
|
if (!addQueue || !consolidateQueue) {
|
||||||
logger.logger.info("Initializing Notifications Queues");
|
logger.logger.debug("Initializing Notifications Queues");
|
||||||
|
|
||||||
addQueue = new Queue("notificationsAdd", {
|
addQueue = new Queue("notificationsAdd", {
|
||||||
connection: pubClient,
|
connection: pubClient,
|
||||||
@@ -63,7 +63,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
"notificationsAdd",
|
"notificationsAdd",
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
|
const { jobId, key, variables, recipients, body, jobRoNumber } = job.data;
|
||||||
logger.logger.info(`Adding notifications for jobId ${jobId}`);
|
logger.logger.debug(`Adding notifications for jobId ${jobId}`);
|
||||||
|
|
||||||
const redisKeyPrefix = `app:notifications:${jobId}`;
|
const redisKeyPrefix = `app:notifications:${jobId}`;
|
||||||
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
|
const notification = { key, variables, body, jobRoNumber, timestamp: Date.now() };
|
||||||
@@ -93,7 +93,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
backoff: LOCK_EXPIRATION
|
backoff: LOCK_EXPIRATION
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
logger.logger.info(`Scheduled consolidation for jobId ${jobId}`);
|
logger.logger.debug(`Scheduled consolidation for jobId ${jobId}`);
|
||||||
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
|
await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000);
|
||||||
} else {
|
} else {
|
||||||
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
|
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
|
||||||
@@ -110,7 +110,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
"notificationsConsolidate",
|
"notificationsConsolidate",
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { jobId, recipients } = job.data;
|
const { jobId, recipients } = job.data;
|
||||||
logger.logger.info(`Consolidating notifications for jobId ${jobId}`);
|
logger.logger.debug(`Consolidating notifications for jobId ${jobId}`);
|
||||||
|
|
||||||
const redisKeyPrefix = `app:notifications:${jobId}`;
|
const redisKeyPrefix = `app:notifications:${jobId}`;
|
||||||
const lockKey = `lock:consolidate:${jobId}`;
|
const lockKey = `lock:consolidate:${jobId}`;
|
||||||
@@ -169,7 +169,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
|
const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, {
|
||||||
objects: notificationInserts
|
objects: notificationInserts
|
||||||
});
|
});
|
||||||
logger.logger.info(
|
logger.logger.debug(
|
||||||
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
|
`Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}`
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -203,7 +203,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
associationId
|
associationId
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
logger.logger.info(
|
logger.logger.debug(
|
||||||
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
|
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}`
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
@@ -214,13 +214,16 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
|
|
||||||
await pubClient.del(`app:consolidate:${jobId}`);
|
await pubClient.del(`app:consolidate:${jobId}`);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
|
logger.log(`Consolidation error for jobId ${jobId}`, "ERROR", "notifications", "api", {
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
});
|
||||||
throw err;
|
throw err;
|
||||||
} finally {
|
} finally {
|
||||||
await pubClient.del(lockKey);
|
await pubClient.del(lockKey);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
logger.logger.debug(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -231,20 +234,26 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`));
|
addWorker.on("completed", (job) => logger.logger.debug(`Add job ${job.id} completed`));
|
||||||
consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`));
|
consolidateWorker.on("completed", (job) => logger.logger.debug(`Consolidate job ${job.id} completed`));
|
||||||
addWorker.on("failed", (job, err) =>
|
addWorker.on("failed", (job, err) =>
|
||||||
logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err })
|
logger.log(`Add job ${job.id} failed:`, "ERROR", "notifications", "api", {
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
})
|
||||||
);
|
);
|
||||||
consolidateWorker.on("failed", (job, err) =>
|
consolidateWorker.on("failed", (job, err) =>
|
||||||
logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err })
|
logger.log(`Consolidate job ${job.id} failed:`, "ERROR", "notifications", "api", {
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
// Register cleanup task instead of direct process listeners
|
// Register cleanup task instead of direct process listeners
|
||||||
const shutdown = async () => {
|
const shutdown = async () => {
|
||||||
logger.logger.info("Closing app queue workers...");
|
logger.logger.debug("Closing app queue workers...");
|
||||||
await Promise.all([addWorker.close(), consolidateWorker.close()]);
|
await Promise.all([addWorker.close(), consolidateWorker.close()]);
|
||||||
logger.logger.info("App queue workers closed");
|
logger.logger.debug("App queue workers closed");
|
||||||
};
|
};
|
||||||
|
|
||||||
registerCleanupTask(shutdown);
|
registerCleanupTask(shutdown);
|
||||||
@@ -274,7 +283,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
|
|||||||
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
|
{ jobId, bodyShopId, key, variables, recipients, body, jobRoNumber },
|
||||||
{ jobId: `${jobId}:${Date.now()}` }
|
{ jobId: `${jobId}:${Date.now()}` }
|
||||||
);
|
);
|
||||||
logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
logger.logger.debug(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ let emailConsolidateWorker;
|
|||||||
*/
|
*/
|
||||||
const loadEmailQueue = async ({ pubClient, logger }) => {
|
const loadEmailQueue = async ({ pubClient, logger }) => {
|
||||||
if (!emailAddQueue || !emailConsolidateQueue) {
|
if (!emailAddQueue || !emailConsolidateQueue) {
|
||||||
logger.logger.info("Initializing Email Notification Queues");
|
logger.logger.debug("Initializing Email Notification Queues");
|
||||||
|
|
||||||
// Queue for adding email notifications
|
// Queue for adding email notifications
|
||||||
emailAddQueue = new Queue("emailAdd", {
|
emailAddQueue = new Queue("emailAdd", {
|
||||||
@@ -55,7 +55,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
|||||||
"emailAdd",
|
"emailAdd",
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
|
const { jobId, jobRoNumber, bodyShopName, body, recipients } = job.data;
|
||||||
logger.logger.info(`Adding email notifications for jobId ${jobId}`);
|
logger.logger.debug(`Adding email notifications for jobId ${jobId}`);
|
||||||
|
|
||||||
const redisKeyPrefix = `email:notifications:${jobId}`;
|
const redisKeyPrefix = `email:notifications:${jobId}`;
|
||||||
for (const recipient of recipients) {
|
for (const recipient of recipients) {
|
||||||
@@ -84,7 +84,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
|||||||
backoff: LOCK_EXPIRATION
|
backoff: LOCK_EXPIRATION
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
logger.logger.info(`Scheduled email consolidation for jobId ${jobId}`);
|
logger.logger.debug(`Scheduled email consolidation for jobId ${jobId}`);
|
||||||
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
|
await pubClient.expire(consolidateKey, CONSOLIDATION_KEY_EXPIRATION / 1000);
|
||||||
} else {
|
} else {
|
||||||
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
|
logger.logger.debug(`Email consolidation already scheduled for jobId ${jobId}`);
|
||||||
@@ -102,7 +102,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
|||||||
"emailConsolidate",
|
"emailConsolidate",
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { jobId, jobRoNumber, bodyShopName } = job.data;
|
const { jobId, jobRoNumber, bodyShopName } = job.data;
|
||||||
logger.logger.info(`Consolidating emails for jobId ${jobId}`);
|
logger.logger.debug(`Consolidating emails for jobId ${jobId}`);
|
||||||
|
|
||||||
const lockKey = `lock:emailConsolidate:${jobId}`;
|
const lockKey = `lock:emailConsolidate:${jobId}`;
|
||||||
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
|
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000);
|
||||||
@@ -136,7 +136,7 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
|||||||
type: "html",
|
type: "html",
|
||||||
html: emailBody
|
html: emailBody
|
||||||
});
|
});
|
||||||
logger.logger.info(
|
logger.logger.debug(
|
||||||
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
|
`Sent consolidated email to ${recipient} for jobId ${jobId} with ${messages.length} updates`
|
||||||
);
|
);
|
||||||
await pubClient.del(userKey);
|
await pubClient.del(userKey);
|
||||||
@@ -146,13 +146,16 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
|||||||
await pubClient.del(recipientsSet);
|
await pubClient.del(recipientsSet);
|
||||||
await pubClient.del(`email:consolidate:${jobId}`);
|
await pubClient.del(`email:consolidate:${jobId}`);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.logger.error(`Email consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
|
logger.log(`Email Consolidation error for jobId ${jobId}`, "ERROR", "notifications", "api", {
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
});
|
||||||
throw err;
|
throw err;
|
||||||
} finally {
|
} finally {
|
||||||
await pubClient.del(lockKey);
|
await pubClient.del(lockKey);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.logger.info(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
|
logger.logger.debug(`Skipped email consolidation for jobId ${jobId} - lock held by another worker`);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -164,20 +167,26 @@ const loadEmailQueue = async ({ pubClient, logger }) => {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Event handlers for workers
|
// Event handlers for workers
|
||||||
emailAddWorker.on("completed", (job) => logger.logger.info(`Email add job ${job.id} completed`));
|
emailAddWorker.on("completed", (job) => logger.logger.debug(`Email add job ${job.id} completed`));
|
||||||
emailConsolidateWorker.on("completed", (job) => logger.logger.info(`Email consolidate job ${job.id} completed`));
|
emailConsolidateWorker.on("completed", (job) => logger.logger.debug(`Email consolidate job ${job.id} completed`));
|
||||||
emailAddWorker.on("failed", (job, err) =>
|
emailAddWorker.on("failed", (job, err) =>
|
||||||
logger.logger.error(`Email add job ${job.id} failed: ${err.message}`, { error: err })
|
logger.log(`Email add job ${job.id} failed`, "ERROR", "notifications", "api", {
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
})
|
||||||
);
|
);
|
||||||
emailConsolidateWorker.on("failed", (job, err) =>
|
emailConsolidateWorker.on("failed", (job, err) =>
|
||||||
logger.logger.error(`Email consolidate job ${job.id} failed: ${err.message}`, { error: err })
|
logger.log(`Email consolidate job ${job.id} failed:`, "ERROR", "notifications", "api", {
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
// Register cleanup task instead of direct process listeners
|
// Register cleanup task instead of direct process listeners
|
||||||
const shutdown = async () => {
|
const shutdown = async () => {
|
||||||
logger.logger.info("Closing email queue workers...");
|
logger.logger.debug("Closing email queue workers...");
|
||||||
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
|
await Promise.all([emailAddWorker.close(), emailConsolidateWorker.close()]);
|
||||||
logger.logger.info("Email queue workers closed");
|
logger.logger.debug("Email queue workers closed");
|
||||||
};
|
};
|
||||||
registerCleanupTask(shutdown);
|
registerCleanupTask(shutdown);
|
||||||
}
|
}
|
||||||
@@ -225,7 +234,7 @@ const dispatchEmailsToQueue = async ({ emailsToDispatch, logger }) => {
|
|||||||
{ jobId, jobRoNumber, bodyShopName, body, recipients },
|
{ jobId, jobRoNumber, bodyShopName, body, recipients },
|
||||||
{ jobId: `${jobId}:${Date.now()}` }
|
{ jobId: `${jobId}:${Date.now()}` }
|
||||||
);
|
);
|
||||||
logger.logger.info(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
logger.logger.debug(`Added email notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user