feature/IO-3096-GlobalNotifications - Checkpoint, socket to email to bodyshop mapping.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import React, { createContext } from "react";
|
||||
import useSocket from "./useSocket"; // Import the custom hook
|
||||
import useSocket from "./useSocket";
|
||||
|
||||
// Create the SocketContext
|
||||
const SocketContext = createContext(null);
|
||||
|
||||
@@ -17,7 +17,7 @@ const useSocket = (bodyshop) => {
|
||||
const socketInstance = SocketIO(endpoint, {
|
||||
path: "/wss",
|
||||
withCredentials: true,
|
||||
auth: { token },
|
||||
auth: { token, bodyshopId: bodyshop.id },
|
||||
reconnectionAttempts: Infinity,
|
||||
reconnectionDelay: 2000,
|
||||
reconnectionDelayMax: 10000
|
||||
@@ -95,7 +95,7 @@ const useSocket = (bodyshop) => {
|
||||
|
||||
if (socketRef.current) {
|
||||
// Update token if socket exists
|
||||
socketRef.current.emit("update-token", token);
|
||||
socketRef.current.emit("update-token", { token, bodyshopId: bodyshop.id });
|
||||
} else {
|
||||
// Initialize socket if not already connected
|
||||
initializeSocket(token);
|
||||
|
||||
@@ -73,7 +73,20 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
try {
|
||||
await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem));
|
||||
} catch (error) {
|
||||
logger.log(`Error adding item to the end of the list for socket ${socketId}: ${error}`, "ERROR", "redis");
|
||||
let userEmail = "unknown";
|
||||
let socketMappings = {};
|
||||
try {
|
||||
const userData = await getSessionData(socketId, "user");
|
||||
if (userData && userData.email) {
|
||||
userEmail = userData.email;
|
||||
socketMappings = await getUserSocketMapping(userEmail);
|
||||
}
|
||||
} catch (sessionError) {
|
||||
logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis");
|
||||
}
|
||||
const mappingString = JSON.stringify(socketMappings, null, 2);
|
||||
const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`;
|
||||
logger.log(errorMessage, "ERROR", "redis");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -121,23 +134,81 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
}
|
||||
};
|
||||
|
||||
const addUserSocketMapping = async (email, socketId) => {
|
||||
// Using a Redis set allows a user to have multiple active socket ids.
|
||||
console.log(`Adding socket ${socketId} to user ${email}`);
|
||||
return pubClient.sadd(`user:${email}:sockets`, socketId);
|
||||
const addUserSocketMapping = async (email, socketId, bodyshopId) => {
|
||||
const userKey = `user:${email}`;
|
||||
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
|
||||
try {
|
||||
logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis");
|
||||
// Mark the bodyshop as associated with the user in the hash
|
||||
await pubClient.hset(userKey, `bodyshops:${bodyshopId}`, "1");
|
||||
// Add the socket ID to the bodyshop-specific set
|
||||
await pubClient.sadd(bodyshopKey, socketId);
|
||||
// Set TTL to 24 hours for both keys
|
||||
await pubClient.expire(userKey, 86400);
|
||||
await pubClient.expire(bodyshopKey, 86400);
|
||||
} catch (error) {
|
||||
logger.log(`Error adding socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
|
||||
}
|
||||
};
|
||||
|
||||
const removeUserSocketMapping = async (email, socketId) => {
|
||||
console.log(`Removing socket ${socketId} from user ${email}`);
|
||||
return pubClient.srem(`user:${email}:sockets`, socketId);
|
||||
const refreshUserSocketTTL = async (email, bodyshopId) => {
|
||||
const userKey = `user:${email}`;
|
||||
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
|
||||
try {
|
||||
const userExists = await pubClient.exists(userKey);
|
||||
if (userExists) {
|
||||
await pubClient.expire(userKey, 86400);
|
||||
}
|
||||
const bodyshopExists = await pubClient.exists(bodyshopKey);
|
||||
if (bodyshopExists) {
|
||||
await pubClient.expire(bodyshopKey, 86400);
|
||||
logger.log(`Refreshed TTL for ${email} bodyshop ${bodyshopId} socket mapping`, "debug", "redis");
|
||||
}
|
||||
} catch (error) {
|
||||
logger.log(`Error refreshing TTL for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
|
||||
}
|
||||
};
|
||||
|
||||
const removeUserSocketMapping = async (email, socketId, bodyshopId) => {
|
||||
const userKey = `user:${email}`;
|
||||
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
|
||||
try {
|
||||
logger.log(`Removing socket ${socketId} from user ${email} for bodyshop ${bodyshopId}`, "DEBUG", "redis");
|
||||
await pubClient.srem(bodyshopKey, socketId);
|
||||
// Refresh TTL if there are still sockets, or let it expire
|
||||
const remainingSockets = await pubClient.scard(bodyshopKey);
|
||||
if (remainingSockets > 0) {
|
||||
await pubClient.expire(bodyshopKey, 86400);
|
||||
} else {
|
||||
// Optionally remove the bodyshop field from the hash if no sockets remain
|
||||
await pubClient.hdel(userKey, `bodyshops:${bodyshopId}`);
|
||||
}
|
||||
// Refresh user key TTL if there are still bodyshops
|
||||
const remainingBodyshops = await pubClient.hlen(userKey);
|
||||
if (remainingBodyshops > 0) {
|
||||
await pubClient.expire(userKey, 86400);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.log(`Error removing socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis");
|
||||
}
|
||||
};
|
||||
|
||||
const getUserSocketMapping = async (email) => {
|
||||
const key = `user:${email}:sockets`;
|
||||
const userKey = `user:${email}`;
|
||||
try {
|
||||
return await pubClient.smembers(key);
|
||||
// Get all bodyshop fields from the hash
|
||||
const bodyshops = await pubClient.hkeys(userKey);
|
||||
const result = {};
|
||||
for (const bodyshopField of bodyshops) {
|
||||
const bodyshopId = bodyshopField.split("bodyshops:")[1];
|
||||
const bodyshopKey = `${userKey}:bodyshops:${bodyshopId}`;
|
||||
const socketIds = await pubClient.smembers(bodyshopKey);
|
||||
const ttl = await pubClient.ttl(bodyshopKey);
|
||||
result[bodyshopId] = { socketIds, ttl };
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error(`Error retrieving socket IDs for ${email}:`, error);
|
||||
console.error(`Error retrieving socket mappings for ${email}:`, error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
@@ -157,7 +228,8 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
getUsersInRoom,
|
||||
addUserSocketMapping,
|
||||
removeUserSocketMapping,
|
||||
getUserSocketMapping
|
||||
getUserSocketMapping,
|
||||
refreshUserSocketTTL
|
||||
};
|
||||
|
||||
Object.assign(module.exports, api);
|
||||
@@ -167,86 +239,6 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||
next();
|
||||
});
|
||||
|
||||
// Demo to show how all the helper functions work
|
||||
// const demoSessionData = async () => {
|
||||
// const socketId = "testSocketId";
|
||||
//
|
||||
// // 1. Test setSessionData and getSessionData
|
||||
// await setSessionData(socketId, "field1", "Hello, Redis!");
|
||||
// const field1Value = await getSessionData(socketId, "field1");
|
||||
// console.log("Retrieved single field value:", field1Value);
|
||||
//
|
||||
// // 2. Test setMultipleSessionData and getMultipleSessionData
|
||||
// await setMultipleSessionData(socketId, { field2: "Second Value", field3: "Third Value" });
|
||||
// const multipleFields = await getMultipleSessionData(socketId, ["field2", "field3"]);
|
||||
// console.log("Retrieved multiple field values:", multipleFields);
|
||||
//
|
||||
// // 3. Test setMultipleFromArraySessionData
|
||||
// await setMultipleFromArraySessionData(socketId, [
|
||||
// ["field4", "Fourth Value"],
|
||||
// ["field5", "Fifth Value"]
|
||||
// ]);
|
||||
//
|
||||
// // Retrieve and log all fields
|
||||
// const allFields = await getMultipleSessionData(socketId, ["field1", "field2", "field3", "field4", "field5"]);
|
||||
// console.log("Retrieved all field values:", allFields);
|
||||
//
|
||||
// // 4. Test list functions
|
||||
// // Add item to the end of a Redis list
|
||||
// await addItemToEndOfList(socketId, "logEvents", { event: "Log Event 1", timestamp: new Date() });
|
||||
// await addItemToEndOfList(socketId, "logEvents", { event: "Log Event 2", timestamp: new Date() });
|
||||
//
|
||||
// // Add item to the beginning of a Redis list
|
||||
// await addItemToBeginningOfList(socketId, "logEvents", { event: "First Log Event", timestamp: new Date() });
|
||||
//
|
||||
// // Retrieve the entire list
|
||||
// const logEventsData = await pubClient.lrange(`socket:${socketId}:logEvents`, 0, -1);
|
||||
// const logEvents = logEventsData.map((item) => JSON.parse(item));
|
||||
// console.log("Log Events List:", logEvents);
|
||||
//
|
||||
// // 5. Test clearList
|
||||
// await clearList(socketId, "logEvents");
|
||||
// console.log("Log Events List cleared.");
|
||||
//
|
||||
// // Retrieve the list after clearing to confirm it's empty
|
||||
// const logEventsAfterClear = await pubClient.lrange(`socket:${socketId}:logEvents`, 0, -1);
|
||||
// console.log("Log Events List after clearing:", logEventsAfterClear); // Should be an empty array
|
||||
//
|
||||
// // 6. Test clearSessionData
|
||||
// await clearSessionData(socketId);
|
||||
// console.log("Session data cleared.");
|
||||
//
|
||||
// // 7. Test room functions
|
||||
// const roomName = "testRoom";
|
||||
// const user1 = { id: 1, name: "Alice" };
|
||||
// const user2 = { id: 2, name: "Bob" };
|
||||
//
|
||||
// // Add users to room
|
||||
// await addUserToRoom(roomName, user1);
|
||||
// await addUserToRoom(roomName, user2);
|
||||
//
|
||||
// // Get users in room
|
||||
// const usersInRoom = await getUsersInRoom(roomName);
|
||||
// console.log(`Users in room ${roomName}:`, usersInRoom);
|
||||
//
|
||||
// // Remove a user from room
|
||||
// await removeUserFromRoom(roomName, user1);
|
||||
//
|
||||
// // Get users in room after removal
|
||||
// const usersInRoomAfterRemoval = await getUsersInRoom(roomName);
|
||||
// console.log(`Users in room ${roomName} after removal:`, usersInRoomAfterRemoval);
|
||||
//
|
||||
// // Clean up: remove remaining users from room
|
||||
// await removeUserFromRoom(roomName, user2);
|
||||
//
|
||||
// // Verify room is empty
|
||||
// const usersInRoomAfterCleanup = await getUsersInRoom(roomName);
|
||||
// console.log(`Users in room ${roomName} after cleanup:`, usersInRoomAfterCleanup); // Should be empty
|
||||
// };
|
||||
// if (process.env.NODE_ENV === "development") {
|
||||
// demoSessionData();
|
||||
// }
|
||||
|
||||
return api;
|
||||
};
|
||||
|
||||
|
||||
@@ -3,12 +3,13 @@ const { admin } = require("../firebase/firebase-handler");
|
||||
const redisSocketEvents = ({
|
||||
io,
|
||||
redisHelpers: {
|
||||
setSessionData,
|
||||
clearSessionData,
|
||||
addUserSocketMapping,
|
||||
removeUserSocketMapping,
|
||||
getUserSocketMapping
|
||||
}, // Note: Used if we persist user to Redis
|
||||
setSessionData,
|
||||
getSessionData,
|
||||
clearSessionData,
|
||||
refreshUserSocketTTL
|
||||
},
|
||||
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
|
||||
logger
|
||||
}) => {
|
||||
@@ -19,16 +20,18 @@ const redisSocketEvents = ({
|
||||
|
||||
// Socket Auth Middleware
|
||||
const authMiddleware = async (socket, next) => {
|
||||
if (!socket.handshake.auth.token) {
|
||||
const { token, bodyshopId } = socket.handshake.auth;
|
||||
if (!token) {
|
||||
return next(new Error("Authentication error - no authorization token."));
|
||||
}
|
||||
if (!bodyshopId) {
|
||||
return next(new Error("Authentication error - no bodyshopId provided."));
|
||||
}
|
||||
try {
|
||||
const user = await admin.auth().verifyIdToken(socket.handshake.auth.token);
|
||||
const user = await admin.auth().verifyIdToken(token);
|
||||
socket.user = user;
|
||||
// Persist the user data in Redis for this socket
|
||||
await setSessionData(socket.id, "user", user);
|
||||
// Store a mapping from the user's email to the socket id
|
||||
// await addUserSocketMapping(user.email, socket.id);
|
||||
await setSessionData(socket.id, "user", { ...user, bodyshopId });
|
||||
await addUserSocketMapping(user.email, socket.id, bodyshopId);
|
||||
next();
|
||||
} catch (error) {
|
||||
next(new Error(`Authentication error: ${error.message}`));
|
||||
@@ -37,32 +40,33 @@ const redisSocketEvents = ({
|
||||
|
||||
// Register Socket Events
|
||||
const registerSocketEvents = (socket) => {
|
||||
// Uncomment for further testing
|
||||
// createLogEvent(socket, "debug", `Registering RedisIO Socket Events.`);
|
||||
// getUserSocketMapping(socket.user.email).then((socketIds) => {
|
||||
// console.log(socketIds);
|
||||
// });
|
||||
|
||||
// Token Update Events
|
||||
const registerUpdateEvents = (socket) => {
|
||||
let latestTokenTimestamp = 0;
|
||||
|
||||
const updateToken = async (newToken) => {
|
||||
const updateToken = async ({ token, bodyshopId }) => {
|
||||
const currentTimestamp = Date.now();
|
||||
latestTokenTimestamp = currentTimestamp;
|
||||
|
||||
if (!token || !bodyshopId) {
|
||||
socket.emit("token-updated", { success: false, error: "Token or bodyshopId missing" });
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const user = await admin.auth().verifyIdToken(newToken, true);
|
||||
const user = await admin.auth().verifyIdToken(token, true);
|
||||
if (currentTimestamp < latestTokenTimestamp) {
|
||||
createLogEvent(socket, "warn", "Outdated token validation skipped.");
|
||||
return;
|
||||
}
|
||||
socket.user = user;
|
||||
// Update the session data in Redis with the new token info
|
||||
// await setSessionData(socket.id, "user", user);
|
||||
// Update the mapping with the user's email
|
||||
await addUserSocketMapping(user.email, socket.id);
|
||||
createLogEvent(socket, "debug", `Token updated successfully for socket ID: ${socket.id}`);
|
||||
await setSessionData(socket.id, "user", { ...user, bodyshopId });
|
||||
await refreshUserSocketTTL(user.email, bodyshopId);
|
||||
createLogEvent(
|
||||
socket,
|
||||
"debug",
|
||||
`Token updated successfully for socket ID: ${socket.id} (bodyshop: ${bodyshopId})`
|
||||
);
|
||||
socket.emit("token-updated", { success: true });
|
||||
} catch (error) {
|
||||
if (error.code === "auth/id-token-expired") {
|
||||
@@ -119,21 +123,19 @@ const redisSocketEvents = ({
|
||||
// Disconnect Events
|
||||
const registerDisconnectEvents = (socket) => {
|
||||
const disconnect = async () => {
|
||||
// Remove session data from Redis
|
||||
// await clearSessionData(socket.id);
|
||||
|
||||
// Remove the mapping from user email to this socket id, if available
|
||||
// if (socket.user?.email) {
|
||||
// await removeUserSocketMapping(socket.user.email, socket.id);
|
||||
// }
|
||||
|
||||
// Leave all joined rooms
|
||||
if (socket.user?.email) {
|
||||
const userData = await getSessionData(socket.id, "user");
|
||||
const bodyshopId = userData?.bodyshopId;
|
||||
if (bodyshopId) {
|
||||
await removeUserSocketMapping(socket.user.email, socket.id, bodyshopId);
|
||||
}
|
||||
await clearSessionData(socket.id);
|
||||
}
|
||||
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
|
||||
for (const room of rooms) {
|
||||
socket.leave(room);
|
||||
}
|
||||
};
|
||||
|
||||
socket.on("disconnect", disconnect);
|
||||
};
|
||||
|
||||
@@ -152,6 +154,7 @@ const redisSocketEvents = ({
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const leaveConversationRoom = ({ bodyshopId, conversationId }) => {
|
||||
try {
|
||||
const room = getBodyshopConversationRoom({ bodyshopId, conversationId });
|
||||
|
||||
Reference in New Issue
Block a user