feature/IO-3096-GlobalNotifications - Checkpoint, App Queue
This commit is contained in:
@@ -292,8 +292,8 @@ const applySocketIO = async ({ server, app }) => {
|
|||||||
/**
|
/**
|
||||||
* Load Queues for Email and App
|
* Load Queues for Email and App
|
||||||
*/
|
*/
|
||||||
const loadQueues = async ({ pubClient, logger, redisHelpers }) => {
|
const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||||
const queueSettings = { pubClient, logger, redisHelpers };
|
const queueSettings = { pubClient, logger, redisHelpers, ioRedis };
|
||||||
|
|
||||||
// Assuming loadEmailQueue and loadAppQueue return Promises
|
// Assuming loadEmailQueue and loadAppQueue return Promises
|
||||||
const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([
|
const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([
|
||||||
@@ -329,7 +329,7 @@ const main = async () => {
|
|||||||
require("./server/web-sockets/web-socket");
|
require("./server/web-sockets/web-socket");
|
||||||
|
|
||||||
// Initialize Queues
|
// Initialize Queues
|
||||||
await loadQueues({ pubClient: pubClient, logger, redisHelpers });
|
await loadQueues({ pubClient: pubClient, logger, redisHelpers, ioRedis });
|
||||||
|
|
||||||
applyMiddleware({ app });
|
applyMiddleware({ app });
|
||||||
applyRoutes({ app });
|
applyRoutes({ app });
|
||||||
|
|||||||
@@ -1,15 +1,143 @@
|
|||||||
const { Queue } = require("bullmq");
|
const { Queue, Worker } = require("bullmq");
|
||||||
|
|
||||||
let appQueue;
|
let appQueue;
|
||||||
|
|
||||||
const loadAppQueue = async ({ pubClient, logger, redisHelpers }) => {
|
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||||
if (!appQueue) {
|
if (!appQueue) {
|
||||||
logger.logger.info("Initializing Notifications App Queue");
|
logger.logger.info("Initializing Notifications App Queue");
|
||||||
appQueue = new Queue("notificationsApp", {
|
appQueue = new Queue("notificationsApp", {
|
||||||
connection: pubClient,
|
connection: pubClient,
|
||||||
prefix: "{BULLMQ}"
|
prefix: "{BULLMQ}"
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const worker = new Worker(
|
||||||
|
"notificationsApp",
|
||||||
|
async (job) => {
|
||||||
|
const { jobId, bodyShopId, key, variables, recipients } = job.data;
|
||||||
|
logger.logger.info(`Processing app job ${job.id} for jobId ${jobId}`);
|
||||||
|
|
||||||
|
const redisKey = `app:notifications:${jobId}`;
|
||||||
|
const lastSentKey = `${redisKey}:lastSent`;
|
||||||
|
|
||||||
|
if (job.name === "add-notification") {
|
||||||
|
const notification = { key, variables, timestamp: Date.now() };
|
||||||
|
for (const recipient of recipients) {
|
||||||
|
const { user } = recipient;
|
||||||
|
const userKey = `${redisKey}:${user}`;
|
||||||
|
const existingNotifications = await pubClient.get(userKey);
|
||||||
|
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
|
||||||
|
notifications.push(notification);
|
||||||
|
await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40);
|
||||||
|
}
|
||||||
|
} else if (job.name === "send-notifications") {
|
||||||
|
let hasNewNotifications = false;
|
||||||
|
const lastSent = parseInt((await pubClient.get(lastSentKey)) || "0", 10);
|
||||||
|
|
||||||
|
for (const recipient of recipients) {
|
||||||
|
const { user, bodyShopId: recipientBodyShopId } = recipient;
|
||||||
|
const userKey = `${redisKey}:${user}`;
|
||||||
|
const notifications = await pubClient.get(userKey);
|
||||||
|
if (notifications) {
|
||||||
|
const parsedNotifications = JSON.parse(notifications);
|
||||||
|
const newNotifications = parsedNotifications.filter((n) => n.timestamp > lastSent);
|
||||||
|
if (newNotifications.length > 0) {
|
||||||
|
hasNewNotifications = true;
|
||||||
|
const socketIds = await redisHelpers.getUserSocketMapping(user);
|
||||||
|
if (socketIds && socketIds[bodyShopId]?.socketIds) {
|
||||||
|
socketIds[bodyShopId].socketIds.forEach((socketId) => {
|
||||||
|
ioRedis.to(socketId).emit("notification", {
|
||||||
|
jobId,
|
||||||
|
bodyShopId: recipientBodyShopId,
|
||||||
|
notifications: newNotifications
|
||||||
|
});
|
||||||
|
});
|
||||||
|
logger.logger.info(`Sent ${newNotifications.length} new notifications to ${user} for jobId ${jobId}`);
|
||||||
|
} else {
|
||||||
|
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
|
||||||
|
}
|
||||||
|
await pubClient.del(userKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasNewNotifications) {
|
||||||
|
await pubClient.set(lastSentKey, Date.now(), "EX", 300);
|
||||||
|
} else {
|
||||||
|
// Only remove if no active "add-notification" jobs are pending
|
||||||
|
const activeJobs = await appQueue.getActive();
|
||||||
|
const hasPendingAdds = activeJobs.some((j) => j.name === "add-notification" && j.data.jobId === jobId);
|
||||||
|
if (!hasPendingAdds) {
|
||||||
|
const recurringJobKey = `send-notifications:${jobId}`;
|
||||||
|
const removed = await appQueue.removeRepeatable("send-notifications", {
|
||||||
|
every: 30 * 1000,
|
||||||
|
jobId: recurringJobKey
|
||||||
|
});
|
||||||
|
if (removed) {
|
||||||
|
logger.logger.info(
|
||||||
|
`Successfully removed recurring send-notifications job for jobId ${jobId} due to no new notifications`
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
logger.logger.warn(
|
||||||
|
`Failed to remove recurring send-notifications job for jobId ${jobId} - may already be removed`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.logger.info(
|
||||||
|
`Skipping removal of send-notifications for jobId ${jobId} - pending add-notification jobs exist`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection: pubClient,
|
||||||
|
prefix: "{BULLMQ}",
|
||||||
|
concurrency: 5
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
worker.on("completed", async (job) => {
|
||||||
|
if (job.name === "add-notification") {
|
||||||
|
const { jobId } = job.data;
|
||||||
|
const recurringJobKey = `send-notifications:${jobId}`;
|
||||||
|
const existingJobs = await appQueue.getRepeatableJobs();
|
||||||
|
if (!existingJobs.some((j) => j.key === recurringJobKey)) {
|
||||||
|
await appQueue.add(
|
||||||
|
"send-notifications",
|
||||||
|
{ jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients },
|
||||||
|
{
|
||||||
|
repeat: {
|
||||||
|
every: 30 * 1000, // Every 30 seconds
|
||||||
|
limit: 10 // 5 minutes
|
||||||
|
},
|
||||||
|
jobId: recurringJobKey
|
||||||
|
}
|
||||||
|
);
|
||||||
|
logger.logger.info(`Scheduled 30s notification send for jobId ${jobId}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.logger.info(`Job ${job.id} completed`);
|
||||||
|
});
|
||||||
|
|
||||||
|
worker.on("failed", (job, err) => {
|
||||||
|
logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err });
|
||||||
|
});
|
||||||
|
|
||||||
|
worker.on("error", (err) => {
|
||||||
|
logger.logger.error("Worker error:", { error: err });
|
||||||
|
});
|
||||||
|
|
||||||
|
const shutdown = async () => {
|
||||||
|
if (worker) {
|
||||||
|
logger.logger.info("Closing app queue worker...");
|
||||||
|
await worker.close();
|
||||||
|
logger.logger.info("App queue worker closed");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
process.on("SIGTERM", shutdown);
|
||||||
|
process.on("SIGINT", shutdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
return appQueue;
|
return appQueue;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -20,4 +148,18 @@ const getQueue = () => {
|
|||||||
return appQueue;
|
return appQueue;
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = { loadAppQueue, getQueue };
|
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
|
||||||
|
const appQueue = getQueue();
|
||||||
|
|
||||||
|
for (const app of appsToDispatch) {
|
||||||
|
const { jobId, bodyShopId, key, variables, recipients } = app;
|
||||||
|
await appQueue.add(
|
||||||
|
"add-notification",
|
||||||
|
{ jobId, bodyShopId, key, variables, recipients },
|
||||||
|
{ jobId: `${jobId}:${Date.now()}` }
|
||||||
|
);
|
||||||
|
logger.logger.info(`Added app notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };
|
||||||
|
|||||||
@@ -343,7 +343,7 @@ const tasksUpdatedCreatedBuilder = (data) => {
|
|||||||
app: {
|
app: {
|
||||||
jobId: data.jobId,
|
jobId: data.jobId,
|
||||||
bodyShopId: data.bodyShopId,
|
bodyShopId: data.bodyShopId,
|
||||||
key: "notifications.job.tasksUpdated",
|
key: data.isNew ? "notifications.job.taskCreated" : "notifications.job.taskUpdated",
|
||||||
variables: {
|
variables: {
|
||||||
type: data.isNew ? "created" : "updated",
|
type: data.isNew ? "created" : "updated",
|
||||||
roNumber: data.jobRoNumber
|
roNumber: data.jobRoNumber
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ const { isEmpty, isFunction } = require("lodash");
|
|||||||
const { getMatchingScenarios } = require("./scenarioMapperr");
|
const { getMatchingScenarios } = require("./scenarioMapperr");
|
||||||
const consoleDir = require("../utils/consoleDir");
|
const consoleDir = require("../utils/consoleDir");
|
||||||
const { dispatchEmailsToQueue } = require("./queues/emailQueue");
|
const { dispatchEmailsToQueue } = require("./queues/emailQueue");
|
||||||
|
const { dispatchAppsToQueue } = require("./queues/appQueue");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parses an event and determines matching scenarios for notifications.
|
* Parses an event and determines matching scenarios for notifications.
|
||||||
@@ -171,18 +172,30 @@ const scenarioParser = async (req, jobIdField) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Step 9: Dispatch Email Notifications to the Email Notification Queue
|
// Step 9: Dispatch Email Notifications to the Email Notification Queue
|
||||||
dispatchEmailsToQueue({
|
const emailsToDispatch = scenariosToDispatch.map((scenario) => scenario?.email);
|
||||||
emailsToDispatch: scenariosToDispatch.map((scenario) => scenario?.email),
|
if (!isEmpty(emailsToDispatch)) {
|
||||||
logger
|
dispatchEmailsToQueue({
|
||||||
}).catch((e) =>
|
emailsToDispatch,
|
||||||
logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, {
|
logger
|
||||||
message: e?.message
|
}).catch((e) =>
|
||||||
})
|
logger.log("Something went wrong dispatching emails to the Email Notification Queue", "error", "queue", null, {
|
||||||
);
|
message: e?.message
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Step 10: Dispatch App Notifications to the App Notification Queue
|
// Step 10: Dispatch App Notifications to the App Notification Queue
|
||||||
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
|
const appsToDispatch = scenariosToDispatch.map((scenario) => scenario?.app);
|
||||||
consoleDir({ appsToDispatch });
|
if (!isEmpty(appsToDispatch)) {
|
||||||
|
dispatchAppsToQueue({
|
||||||
|
appsToDispatch,
|
||||||
|
logger
|
||||||
|
}).catch((e) =>
|
||||||
|
logger.log("Something went wrong dispatching apps to the App Notification Queue", "error", "queue", null, {
|
||||||
|
message: e?.message
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = scenarioParser;
|
module.exports = scenarioParser;
|
||||||
|
|||||||
Reference in New Issue
Block a user