Files
bodyshop/server/notifications/queues/appQueue.js

182 lines
7.2 KiB
JavaScript

const { Queue, Worker } = require("bullmq");
let appQueue;
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!appQueue) {
logger.logger.info("Initializing Notifications App Queue");
appQueue = new Queue("notificationsApp", {
connection: pubClient,
prefix: "{BULLMQ}"
});
const worker = new Worker(
"notificationsApp",
async (job) => {
const { jobId, bodyShopId, key, variables, recipients } = job.data;
logger.logger.info(`Processing app job ${job.id} for jobId ${jobId}`);
const redisKey = `app:notifications:${jobId}`;
const lastSentKey = `${redisKey}:lastSent`;
const lockKey = `lock:send-notifications:${jobId}`;
const recurringFlagKey = `app:recurring:${jobId}`;
if (job.name === "add-notification") {
const notification = { key, variables, timestamp: Date.now() };
for (const recipient of recipients) {
const { user } = recipient;
const userKey = `${redisKey}:${user}`;
const existingNotifications = await pubClient.get(userKey);
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
notifications.push(notification);
await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40);
}
} else if (job.name === "send-notifications") {
let hasNewNotifications = false;
const lastSent = parseInt((await pubClient.get(lastSentKey)) || "0", 10);
for (const recipient of recipients) {
const { user, bodyShopId: recipientBodyShopId } = recipient;
const userKey = `${redisKey}:${user}`;
const notifications = await pubClient.get(userKey);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
const newNotifications = parsedNotifications.filter((n) => n.timestamp > lastSent);
if (newNotifications.length > 0) {
hasNewNotifications = true;
const socketIds = await redisHelpers.getUserSocketMapping(user);
if (socketIds && socketIds[bodyShopId]?.socketIds) {
socketIds[bodyShopId].socketIds.forEach((socketId) => {
ioRedis.to(socketId).emit("notification", {
jobId,
bodyShopId: recipientBodyShopId,
notifications: newNotifications
});
});
logger.logger.info(`Sent ${newNotifications.length} new notifications to ${user} for jobId ${jobId}`);
} else {
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
await pubClient.del(userKey);
}
}
}
if (hasNewNotifications) {
await pubClient.set(lastSentKey, Date.now(), "EX", 300);
} else {
const activeJobs = await appQueue.getActive();
const hasPendingAdds = activeJobs.some((j) => j.name === "add-notification" && j.data.jobId === jobId);
if (!hasPendingAdds) {
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10);
if (lockAcquired) {
const recurringJobKey = `send-notifications:${jobId}`;
const repeatableJobs = await appQueue.getRepeatableJobs();
const jobExists = repeatableJobs.some((j) => j.key === recurringJobKey);
if (jobExists) {
await appQueue.removeRepeatableByKey(recurringJobKey);
// Drain all remaining send-notifications jobs for this jobId
await appQueue.drain(false); // false to not force removal of active jobs
logger.logger.info(
`Successfully removed recurring send-notifications job and drained queue for jobId ${jobId} with key ${recurringJobKey}`
);
} else {
logger.logger.info(
`No recurring send-notifications job found for jobId ${jobId} with key ${recurringJobKey} - processing leftover scheduled instance`
);
}
await pubClient.del(lockKey);
await pubClient.del(recurringFlagKey);
} else {
logger.logger.info(
`Skipped removal of send-notifications for jobId ${jobId} - lock held by another worker`
);
}
} else {
logger.logger.info(
`Skipping removal of send-notifications for jobId ${jobId} - pending add-notification jobs exist`
);
}
}
}
},
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 5
}
);
worker.on("completed", async (job) => {
if (job.name === "add-notification") {
const { jobId } = job.data;
const recurringJobKey = `send-notifications:${jobId}`;
const recurringFlagKey = `app:recurring:${jobId}`;
const flagSet = await pubClient.setnx(recurringFlagKey, "active");
if (flagSet) {
const existingJobs = await appQueue.getRepeatableJobs();
if (!existingJobs.some((j) => j.key === recurringJobKey)) {
await appQueue.add(
"send-notifications",
{ jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients },
{
repeat: {
every: 30 * 1000,
limit: 10
},
jobId: recurringJobKey
}
);
logger.logger.info(`Scheduled 30s notification send for jobId ${jobId} with key ${recurringJobKey}`);
await pubClient.expire(recurringFlagKey, 300);
}
}
}
logger.logger.info(`Job ${job.id} completed`);
});
worker.on("failed", (job, err) => {
logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err });
});
worker.on("error", (err) => {
logger.logger.error("Worker error:", { error: err });
});
const shutdown = async () => {
if (worker) {
logger.logger.info("Closing app queue worker...");
await worker.close();
logger.logger.info("App queue worker closed");
}
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
return appQueue;
};
const getQueue = () => {
if (!appQueue) {
throw new Error("App queue not initialized. Ensure loadAppQueue is called during bootstrap.");
}
return appQueue;
};
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
const appQueue = getQueue();
for (const app of appsToDispatch) {
const { jobId, bodyShopId, key, variables, recipients } = app;
await appQueue.add(
"add-notification",
{ jobId, bodyShopId, key, variables, recipients },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.info(`Added app notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};
module.exports = { loadAppQueue, getQueue, dispatchAppsToQueue };