feature/IO-3096-GlobalNotifications - Checkpoint, App Queue

This commit is contained in:
Dave Richer
2025-02-18 17:37:24 -05:00
parent 00005c881e
commit 2a81517104
4 changed files with 190 additions and 189 deletions

View File

@@ -195,8 +195,16 @@ const connectToRedisCluster = async () => {
return new Promise((resolve, reject) => {
redisCluster.on("ready", () => {
logger.log(`Redis cluster connection established.`, "INFO", "redis", "api");
if (process.env.NODE_ENV === "development" && process.env?.CLEAR_REDIS_ON_START === "true") {
logger.log("[Development] Flushing Redis Cluster on Service start...", "INFO", "redis", "api");
const master = redisCluster.nodes("master");
Promise.all(master.map((node) => node.flushall())).then(() => {
resolve(redisCluster);
});
} else {
resolve(redisCluster);
}
});
redisCluster.on("error", (err) => {
logger.log(`Redis cluster connection failed: ${err.message}`, "ERROR", "redis", "api");

View File

@@ -1,103 +1,57 @@
const { Queue, Worker } = require("bullmq");
let appQueue;
let addQueue;
let consolidateQueue;
const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
if (!appQueue) {
logger.logger.info("Initializing Notifications App Queue");
appQueue = new Queue("notificationsApp", {
if (!addQueue || !consolidateQueue) {
logger.logger.info("Initializing Notifications Queues");
addQueue = new Queue("notificationsAdd", {
connection: pubClient,
prefix: "{BULLMQ}"
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
const worker = new Worker(
"notificationsApp",
consolidateQueue = new Queue("notificationsConsolidate", {
connection: pubClient,
prefix: "{BULLMQ}",
defaultJobOptions: { removeOnComplete: true, removeOnFail: true }
});
const addWorker = new Worker(
"notificationsAdd",
async (job) => {
const { jobId, bodyShopId, key, variables, recipients } = job.data;
logger.logger.info(`Processing app job ${job.id} for jobId ${jobId}`);
const { jobId, key, variables, recipients } = job.data;
logger.logger.info(`Adding notifications for jobId ${jobId}`);
const redisKey = `app:notifications:${jobId}`;
const lastSentKey = `${redisKey}:lastSent`;
const lockKey = `lock:send-notifications:${jobId}`;
const recurringFlagKey = `app:recurring:${jobId}`;
if (job.name === "add-notification") {
const redisKeyPrefix = `app:notifications:${jobId}`;
const notification = { key, variables, timestamp: Date.now() };
for (const recipient of recipients) {
const { user } = recipient;
const userKey = `${redisKey}:${user}`;
const userKey = `${redisKeyPrefix}:${user}`;
const existingNotifications = await pubClient.get(userKey);
const notifications = existingNotifications ? JSON.parse(existingNotifications) : [];
notifications.push(notification);
await pubClient.set(userKey, JSON.stringify(notifications), "EX", 40);
}
} else if (job.name === "send-notifications") {
let hasNewNotifications = false;
const lastSent = parseInt((await pubClient.get(lastSentKey)) || "0", 10);
for (const recipient of recipients) {
const { user, bodyShopId: recipientBodyShopId } = recipient;
const userKey = `${redisKey}:${user}`;
const notifications = await pubClient.get(userKey);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
const newNotifications = parsedNotifications.filter((n) => n.timestamp > lastSent);
if (newNotifications.length > 0) {
hasNewNotifications = true;
const socketIds = await redisHelpers.getUserSocketMapping(user);
if (socketIds && socketIds[bodyShopId]?.socketIds) {
socketIds[bodyShopId].socketIds.forEach((socketId) => {
ioRedis.to(socketId).emit("notification", {
jobId,
bodyShopId: recipientBodyShopId,
notifications: newNotifications
});
});
logger.logger.info(`Sent ${newNotifications.length} new notifications to ${user} for jobId ${jobId}`);
} else {
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
await pubClient.del(userKey);
}
}
logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`);
}
if (hasNewNotifications) {
await pubClient.set(lastSentKey, Date.now(), "EX", 300);
} else {
const activeJobs = await appQueue.getActive();
const hasPendingAdds = activeJobs.some((j) => j.name === "add-notification" && j.data.jobId === jobId);
if (!hasPendingAdds) {
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10);
if (lockAcquired) {
const recurringJobKey = `send-notifications:${jobId}`;
const repeatableJobs = await appQueue.getRepeatableJobs();
const jobExists = repeatableJobs.some((j) => j.key === recurringJobKey);
if (jobExists) {
await appQueue.removeRepeatableByKey(recurringJobKey);
// Drain all remaining send-notifications jobs for this jobId
await appQueue.drain(false); // false to not force removal of active jobs
logger.logger.info(
`Successfully removed recurring send-notifications job and drained queue for jobId ${jobId} with key ${recurringJobKey}`
const consolidateKey = `app:consolidate:${jobId}`;
const flagSet = await pubClient.setnx(consolidateKey, "pending");
logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`);
if (flagSet) {
await consolidateQueue.add(
"consolidate-notifications",
{ jobId, recipients },
{ jobId: `consolidate:${jobId}`, delay: 5000 }
);
logger.logger.info(`Scheduled consolidation for jobId ${jobId}`);
await pubClient.expire(consolidateKey, 300);
} else {
logger.logger.info(
`No recurring send-notifications job found for jobId ${jobId} with key ${recurringJobKey} - processing leftover scheduled instance`
);
}
await pubClient.del(lockKey);
await pubClient.del(recurringFlagKey);
} else {
logger.logger.info(
`Skipped removal of send-notifications for jobId ${jobId} - lock held by another worker`
);
}
} else {
logger.logger.info(
`Skipping removal of send-notifications for jobId ${jobId} - pending add-notification jobs exist`
);
}
}
logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`);
}
},
{
@@ -107,61 +61,112 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
}
);
worker.on("completed", async (job) => {
if (job.name === "add-notification") {
const { jobId } = job.data;
const recurringJobKey = `send-notifications:${jobId}`;
const recurringFlagKey = `app:recurring:${jobId}`;
const flagSet = await pubClient.setnx(recurringFlagKey, "active");
if (flagSet) {
const existingJobs = await appQueue.getRepeatableJobs();
if (!existingJobs.some((j) => j.key === recurringJobKey)) {
await appQueue.add(
"send-notifications",
{ jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients },
{
repeat: {
every: 30 * 1000,
limit: 10
const consolidateWorker = new Worker(
"notificationsConsolidate",
async (job) => {
const { jobId, recipients } = job.data;
logger.logger.info(`Consolidating notifications for jobId ${jobId}`);
const redisKeyPrefix = `app:notifications:${jobId}`;
const lockKey = `lock:consolidate:${jobId}`;
const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10);
logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`);
if (lockAcquired) {
try {
const allNotifications = {};
const uniqueUsers = [...new Set(recipients.map((r) => r.user))];
logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`);
for (const user of uniqueUsers) {
const userKey = `${redisKeyPrefix}:${user}`;
const notifications = await pubClient.get(userKey);
logger.logger.debug(`Retrieved notifications for ${user}: ${notifications}`);
if (notifications) {
const parsedNotifications = JSON.parse(notifications);
const userRecipients = recipients.filter((r) => r.user === user);
for (const { bodyShopId } of userRecipients) {
allNotifications[user] = allNotifications[user] || {};
allNotifications[user][bodyShopId] = parsedNotifications;
}
await pubClient.del(userKey);
logger.logger.debug(`Deleted Redis key ${userKey}`);
} else {
logger.logger.warn(`No notifications found for ${user} under ${userKey}`);
}
}
logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`);
for (const [user, bodyShopData] of Object.entries(allNotifications)) {
const userMapping = await redisHelpers.getUserSocketMapping(user);
logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`);
for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) {
if (userMapping && userMapping[bodyShopId]?.socketIds) {
userMapping[bodyShopId].socketIds.forEach((socketId) => {
logger.logger.debug(
`Emitting to socket ${socketId}: ${JSON.stringify({ jobId, bodyShopId, notifications })}`
);
ioRedis.to(socketId).emit("notification", {
jobId,
bodyShopId,
notifications
});
});
logger.logger.info(
`Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId}`
);
} else {
logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`);
}
}
}
await pubClient.del(`app:consolidate:${jobId}`);
} catch (err) {
logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err });
throw err; // Re-throw to trigger failed event
} finally {
await pubClient.del(lockKey);
}
} else {
logger.logger.info(`Skipped consolidation for jobId ${jobId} - lock held by another worker`);
}
},
jobId: recurringJobKey
{
connection: pubClient,
prefix: "{BULLMQ}",
concurrency: 1,
limiter: { max: 1, duration: 5000 }
}
);
logger.logger.info(`Scheduled 30s notification send for jobId ${jobId} with key ${recurringJobKey}`);
await pubClient.expire(recurringFlagKey, 300);
}
}
}
logger.logger.info(`Job ${job.id} completed`);
});
worker.on("failed", (job, err) => {
logger.logger.error(`Job ${job.id} failed: ${err.message}`, { error: err });
});
worker.on("error", (err) => {
logger.logger.error("Worker error:", { error: err });
});
addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`));
consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`));
addWorker.on("failed", (job, err) =>
logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err })
);
consolidateWorker.on("failed", (job, err) =>
logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err })
);
const shutdown = async () => {
if (worker) {
logger.logger.info("Closing app queue worker...");
await worker.close();
logger.logger.info("App queue worker closed");
}
logger.logger.info("Closing app queue workers...");
await Promise.all([addWorker.close(), consolidateWorker.close()]);
logger.logger.info("App queue workers closed");
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
return appQueue;
return addQueue; // Return the add queue for dispatching
};
const getQueue = () => {
if (!appQueue) {
throw new Error("App queue not initialized. Ensure loadAppQueue is called during bootstrap.");
}
return appQueue;
if (!addQueue) throw new Error("Add queue not initialized. Ensure loadAppQueue is called during bootstrap.");
return addQueue;
};
const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
@@ -174,7 +179,7 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => {
{ jobId, bodyShopId, key, variables, recipients },
{ jobId: `${jobId}:${Date.now()}` }
);
logger.logger.info(`Added app notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`);
}
};

View File

@@ -136,75 +136,75 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
const userKey = `user:${email}`;
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
// Mark the bodyshop as associated with the user in the hash
await pubClient.hset(userKey, `bodyshops:${bodyshopId}`, "1");
// Add the socket ID to the bodyshop-specific set
await pubClient.sadd(bodyshopKey, socketId);
// Set TTL to 24 hours for both keys
await pubClient.expire(userKey, 86400);
await pubClient.expire(bodyshopKey, 86400);
// Save the mapping: socketId -> bodyshopId
await pubClient.hset(socketMappingKey, socketId, bodyshopId);
// Set TTL (24 hours) for the mapping hash
await pubClient.expire(socketMappingKey, 86400);
} catch (error) {
logger.log(`Error adding socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
}
};
const refreshUserSocketTTL = async (email, bodyshopId) => {
const refreshUserSocketTTL = async (email) => {
const userKey = `user:${email}`;
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
const userExists = await pubClient.exists(userKey);
if (userExists) {
await pubClient.expire(userKey, 86400);
}
const bodyshopExists = await pubClient.exists(bodyshopKey);
if (bodyshopExists) {
await pubClient.expire(bodyshopKey, 86400);
logger.log(`Refreshed TTL for ${email} bodyshop ${bodyshopId} socket mapping`, "debug", "redis");
const exists = await pubClient.exists(socketMappingKey);
if (exists) {
await pubClient.expire(socketMappingKey, 86400);
logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis");
}
} catch (error) {
logger.log(`Error refreshing TTL for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis");
}
};
const removeUserSocketMapping = async (email, socketId, bodyshopId) => {
const removeUserSocketMapping = async (email, socketId) => {
const userKey = `user:${email}`;
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
logger.log(`Removing socket ${socketId} from user ${email} for bodyshop ${bodyshopId}`, "DEBUG", "redis");
await pubClient.srem(bodyshopKey, socketId);
// Refresh TTL if there are still sockets, or let it expire
const remainingSockets = await pubClient.scard(bodyshopKey);
if (remainingSockets > 0) {
await pubClient.expire(bodyshopKey, 86400);
} else {
// Optionally remove the bodyshop field from the hash if no sockets remain
await pubClient.hdel(userKey, `bodyshops:${bodyshopId}`);
logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis");
// Look up the bodyshopId associated with this socket
const bodyshopId = await pubClient.hget(socketMappingKey, socketId);
if (!bodyshopId) {
logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis");
return;
}
// Refresh user key TTL if there are still bodyshops
const remainingBodyshops = await pubClient.hlen(userKey);
if (remainingBodyshops > 0) {
await pubClient.expire(userKey, 86400);
// Remove the socket mapping
await pubClient.hdel(socketMappingKey, socketId);
logger.log(
`Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`,
"DEBUG",
"redis"
);
// Refresh TTL if any socket mappings remain
const remainingSockets = await pubClient.hlen(socketMappingKey);
if (remainingSockets > 0) {
await pubClient.expire(socketMappingKey, 86400);
}
} catch (error) {
logger.log(`Error removing socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
logger.log(`Error removing socket mapping for ${email}: ${error}`, "ERROR", "redis");
}
};
const getUserSocketMapping = async (email) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
// Get all bodyshop fields from the hash
const bodyshops = await pubClient.hkeys(userKey);
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
const ttl = await pubClient.ttl(socketMappingKey);
// Group socket IDs by bodyshopId
const result = {};
for (const bodyshopField of bodyshops) {
const bodyshopId = bodyshopField.split("bodyshops:")[1];
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
const socketIds = await pubClient.smembers(bodyshopKey);
const ttl = await pubClient.ttl(bodyshopKey);
result[bodyshopId] = { socketIds, ttl };
for (const [socketId, bodyshopId] of Object.entries(mapping)) {
if (!result[bodyshopId]) {
result[bodyshopId] = { socketIds: [], ttl };
}
result[bodyshopId].socketIds.push(socketId);
}
return result;
} catch (error) {

View File

@@ -2,14 +2,7 @@ const { admin } = require("../firebase/firebase-handler");
const redisSocketEvents = ({
io,
redisHelpers: {
addUserSocketMapping,
removeUserSocketMapping,
setSessionData,
getSessionData,
clearSessionData,
refreshUserSocketTTL
},
redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL },
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
logger
}) => {
@@ -30,7 +23,6 @@ const redisSocketEvents = ({
try {
const user = await admin.auth().verifyIdToken(token);
socket.user = user;
await setSessionData(socket.id, "user", { ...user, bodyshopId });
await addUserSocketMapping(user.email, socket.id, bodyshopId);
next();
} catch (error) {
@@ -60,7 +52,6 @@ const redisSocketEvents = ({
return;
}
socket.user = user;
await setSessionData(socket.id, "user", { ...user, bodyshopId });
await refreshUserSocketTTL(user.email, bodyshopId);
createLogEvent(
socket,
@@ -124,18 +115,15 @@ const redisSocketEvents = ({
const registerDisconnectEvents = (socket) => {
const disconnect = async () => {
if (socket.user?.email) {
const userData = await getSessionData(socket.id, "user");
const bodyshopId = userData?.bodyshopId;
if (bodyshopId) {
await removeUserSocketMapping(socket.user.email, socket.id, bodyshopId);
}
await clearSessionData(socket.id);
await removeUserSocketMapping(socket.user.email, socket.id);
}
// Leave all rooms except the default room (socket.id)
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
for (const room of rooms) {
socket.leave(room);
}
};
socket.on("disconnect", disconnect);
};