/** * Apply Redis helper functions * @param pubClient * @param app * @param logger */ const applyRedisHelpers = ({ pubClient, app, logger }) => { // Store session data in Redis const setSessionData = async (socketId, key, value) => { try { await pubClient.hset(`socket:${socketId}`, key, JSON.stringify(value)); // Use Redis pubClient } catch (error) { logger.log(`Error Setting Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Retrieve session data from Redis const getSessionData = async (socketId, key) => { try { const data = await pubClient.hget(`socket:${socketId}`, key); return data ? JSON.parse(data) : null; } catch (error) { logger.log(`Error Getting Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Clear session data from Redis const clearSessionData = async (socketId) => { try { await pubClient.del(`socket:${socketId}`); } catch (error) { logger.log(`Error Clearing Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Store multiple session data in Redis const setMultipleSessionData = async (socketId, keyValues) => { try { // keyValues is expected to be an object { key1: value1, key2: value2, ... } const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]); await pubClient.hset(`socket:${socketId}`, ...entries.flat()); } catch (error) { logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Retrieve multiple session data from Redis const getMultipleSessionData = async (socketId, keys) => { try { const data = await pubClient.hmget(`socket:${socketId}`, keys); // Redis returns an object with null values for missing keys, so we parse the non-null ones return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null])); } catch (error) { logger.log(`Error Getting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; const setMultipleFromArraySessionData = async (socketId, keyValueArray) => { try { // Use Redis multi/pipeline to batch the commands const multi = pubClient.multi(); keyValueArray.forEach(([key, value]) => { multi.hset(`socket:${socketId}`, key, JSON.stringify(value)); }); await multi.exec(); // Execute all queued commands } catch (error) { logger.log(`Error Setting Multiple Session Data for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Helper function to add an item to the end of the Redis list const addItemToEndOfList = async (socketId, key, newItem) => { try { await pubClient.rpush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); } catch (error) { let userEmail = "unknown"; let socketMappings = {}; try { const userData = await getSessionData(socketId, "user"); if (userData && userData.email) { userEmail = userData.email; socketMappings = await getUserSocketMapping(userEmail); } } catch (sessionError) { logger.log(`Failed to fetch session data for socket ${socketId}: ${sessionError}`, "ERROR", "redis"); } const mappingString = JSON.stringify(socketMappings, null, 2); const errorMessage = `Error adding item to the end of the list for socket ${socketId}: ${error}. User: ${userEmail}, Socket Mappings: ${mappingString}`; logger.log(errorMessage, "ERROR", "redis"); } }; // Helper function to add an item to the beginning of the Redis list const addItemToBeginningOfList = async (socketId, key, newItem) => { try { await pubClient.lpush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); } catch (error) { logger.log(`Error adding item to the beginning of the list for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Helper function to clear a list in Redis const clearList = async (socketId, key) => { try { await pubClient.del(`socket:${socketId}:${key}`); } catch (error) { logger.log(`Error clearing list for socket ${socketId}: ${error}`, "ERROR", "redis"); } }; // Add methods to manage room users const addUserToRoom = async (room, user) => { try { await pubClient.sadd(room, JSON.stringify(user)); } catch (error) { logger.log(`Error adding user to room ${room}: ${error}`, "ERROR", "redis"); } }; const removeUserFromRoom = async (room, user) => { try { await pubClient.srem(room, JSON.stringify(user)); } catch (error) { logger.log(`Error removing user to room ${room}: ${error}`, "ERROR", "redis"); } }; const getUsersInRoom = async (room) => { try { const users = await pubClient.smembers(room); return users.map((user) => JSON.parse(user)); } catch (error) { logger.log(`Error getting users in room ${room}: ${error}`, "ERROR", "redis"); } }; const addUserSocketMapping = async (email, socketId, bodyshopId) => { const userKey = `user:${email}`; const socketMappingKey = `${userKey}:socketMapping`; try { logger.log(`Adding socket ${socketId} to user ${email} for bodyshop ${bodyshopId}`, "debug", "redis"); // Save the mapping: socketId -> bodyshopId await pubClient.hset(socketMappingKey, socketId, bodyshopId); // Set TTL (24 hours) for the mapping hash await pubClient.expire(socketMappingKey, 86400); } catch (error) { logger.log(`Error adding socket mapping for ${email} (bodyshop ${bodyshopId}): ${error}`, "ERROR", "redis"); } }; const refreshUserSocketTTL = async (email) => { const userKey = `user:${email}`; const socketMappingKey = `${userKey}:socketMapping`; try { const exists = await pubClient.exists(socketMappingKey); if (exists) { await pubClient.expire(socketMappingKey, 86400); logger.log(`Refreshed TTL for ${email} socket mapping`, "debug", "redis"); } } catch (error) { logger.log(`Error refreshing TTL for ${email}: ${error}`, "ERROR", "redis"); } }; const removeUserSocketMapping = async (email, socketId) => { const userKey = `user:${email}`; const socketMappingKey = `${userKey}:socketMapping`; try { logger.log(`Removing socket ${socketId} mapping for user ${email}`, "DEBUG", "redis"); // Look up the bodyshopId associated with this socket const bodyshopId = await pubClient.hget(socketMappingKey, socketId); if (!bodyshopId) { logger.log(`Socket ${socketId} not found for user ${email}`, "DEBUG", "redis"); return; } // Remove the socket mapping await pubClient.hdel(socketMappingKey, socketId); logger.log( `Removed socket ${socketId} (associated with bodyshop ${bodyshopId}) for user ${email}`, "DEBUG", "redis" ); // Refresh TTL if any socket mappings remain const remainingSockets = await pubClient.hlen(socketMappingKey); if (remainingSockets > 0) { await pubClient.expire(socketMappingKey, 86400); } } catch (error) { logger.log(`Error removing socket mapping for ${email}: ${error}`, "ERROR", "redis"); } }; const getUserSocketMapping = async (email) => { const userKey = `user:${email}`; const socketMappingKey = `${userKey}:socketMapping`; try { // Retrieve all socket mappings for the user const mapping = await pubClient.hgetall(socketMappingKey); const ttl = await pubClient.ttl(socketMappingKey); // Group socket IDs by bodyshopId const result = {}; for (const [socketId, bodyshopId] of Object.entries(mapping)) { if (!result[bodyshopId]) { result[bodyshopId] = { socketIds: [], ttl }; } result[bodyshopId].socketIds.push(socketId); } return result; } catch (error) { console.error(`Error retrieving socket mappings for ${email}:`, error); throw error; } }; const api = { setSessionData, getSessionData, clearSessionData, setMultipleSessionData, getMultipleSessionData, setMultipleFromArraySessionData, addItemToEndOfList, addItemToBeginningOfList, clearList, addUserToRoom, removeUserFromRoom, getUsersInRoom, addUserSocketMapping, removeUserSocketMapping, getUserSocketMapping, refreshUserSocketTTL }; Object.assign(module.exports, api); app.use((req, res, next) => { req.sessionUtils = api; next(); }); return api; }; module.exports = { applyRedisHelpers };