From 5114138c672d774fcd67ba4cdd96bdfaa12cfa0f Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Thu, 3 Oct 2024 16:30:15 -0400 Subject: [PATCH] docker-redis - final cleanup Signed-off-by: Dave Richer --- Dockerfile | 2 +- docker-compose.yml | 7 + server.js | 33 ++-- server/email/tasksEmails.js | 10 +- server/utils/ioHelpers.js | 4 +- server/utils/redisHelpers.js | 6 +- server/web-sockets/redisSocketEvents.js | 205 +++++++++++++----------- 7 files changed, 142 insertions(+), 125 deletions(-) diff --git a/Dockerfile b/Dockerfile index 60f1193ee..dab748c1b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,4 +41,4 @@ COPY . . EXPOSE 4000 # Start the application -CMD ["nodemon", "--exitcrash" , "--legacy-watch", "server.js"] +CMD ["nodemon", "--legacy-watch", "server.js"] diff --git a/docker-compose.yml b/docker-compose.yml index 0c0494b0c..054de22ab 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,3 +1,10 @@ +############################# +# Ports Exposed +# 4000 - Imex Node API +# 3333 - SocketIO Admin-UI +# 3334 - Redis-Insights +############################# + services: redis-node-1: build: diff --git a/server.js b/server.js index 10182fe18..b57cc667d 100644 --- a/server.js +++ b/server.js @@ -12,8 +12,8 @@ const { instrument } = require("@socket.io/admin-ui"); const { isString, isEmpty } = require("lodash"); const logger = require("./server/utils/logger"); -const applyRedisHelpers = require("./server/utils/redisHelpers"); -const applyIOHelpers = require("./server/utils/ioHelpers"); +const { applyRedisHelpers } = require("./server/utils/redisHelpers"); +const { applyIOHelpers } = require("./server/utils/ioHelpers"); const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents"); // Load environment variables @@ -62,7 +62,7 @@ const SOCKETIO_CORS_ORIGIN_DEV = ["http://localhost:3333", "https://localhost:33 * Middleware for Express app * @param app */ -const applyMiddleware = (app) => { +const applyMiddleware = ({ app }) => { app.use(compression()); app.use(cookieParser()); app.use(bodyParser.json({ limit: "50mb" })); @@ -80,7 +80,7 @@ const applyMiddleware = (app) => { * Route groupings for Express app * @param app */ -const applyRoutes = (app) => { +const applyRoutes = ({ app }) => { app.use("/", require("./server/routes/miscellaneousRoutes")); app.use("/notifications", require("./server/routes/notificationsRoutes")); app.use("/render", require("./server/routes/renderRoutes")); @@ -171,7 +171,7 @@ const connectToRedisCluster = () => { * @param server * @param app */ -const applySocketIO = async (server, app) => { +const applySocketIO = async ({ server, app }) => { const redisCluster = await connectToRedisCluster(); // Handle errors @@ -182,16 +182,17 @@ const applySocketIO = async (server, app) => { const pubClient = redisCluster; const subClient = pubClient.duplicate(); - // https://github.com/redis/node-redis/blob/master/docs/clustering.md - // https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html - pubClient.on("error", (err) => logger.log(`Redis pubClient error: ${err}`, "ERROR", "redis")); subClient.on("error", (err) => logger.log(`Redis subClient error: ${err}`, "ERROR", "redis")); process.on("SIGINT", async () => { logger.log("Closing Redis connections...", "INFO", "redis", "api"); - await Promise.all([pubClient.disconnect(), subClient.disconnect()]); - process.exit(0); + try { + await Promise.all([pubClient.disconnect(), subClient.disconnect()]); + logger.log("Redis connections closed. Process will exit.", "INFO", "redis", "api"); + } catch (error) { + logger.log(`Error closing Redis connections: ${error.message}`, "ERROR", "redis", "api"); + } }); const ioRedis = new Server(server, { @@ -259,16 +260,16 @@ const main = async () => { const server = http.createServer(app); - const { pubClient, ioRedis } = await applySocketIO(server, app); - const api = applyRedisHelpers(pubClient, app, logger); - const ioHelpers = applyIOHelpers(app, api, ioRedis, logger); + const { pubClient, ioRedis } = await applySocketIO({ server, app }); + const redisHelpers = applyRedisHelpers({ pubClient, app, logger }); + const ioHelpers = applyIOHelpers({ app, redisHelpers, ioRedis, logger }); // Legacy Socket Events require("./server/web-sockets/web-socket"); - applyMiddleware(app); - applyRoutes(app); - redisSocketEvents(ioRedis, api, ioHelpers); + applyMiddleware({ app }); + applyRoutes({ app }); + redisSocketEvents({ io: ioRedis, redisHelpers, ioHelpers, logger }); try { await server.listen(port); diff --git a/server/email/tasksEmails.js b/server/email/tasksEmails.js index 1f508be31..7fe0f7808 100644 --- a/server/email/tasksEmails.js +++ b/server/email/tasksEmails.js @@ -45,22 +45,20 @@ if (process.env.NODE_ENV !== "development") { // Handling SIGINT (e.g., Ctrl+C) process.on("SIGINT", async () => { await tasksEmailQueueCleanup(); - process.exit(0); }); // Handling SIGTERM (e.g., sent by system shutdown) process.on("SIGTERM", async () => { await tasksEmailQueueCleanup(); - process.exit(0); }); // Handling uncaught exceptions process.on("uncaughtException", async (err) => { await tasksEmailQueueCleanup(); - process.exit(1); // Exit with an 'error' code + throw err; }); // Handling unhandled promise rejections process.on("unhandledRejection", async (reason, promise) => { await tasksEmailQueueCleanup(); - process.exit(1); // Exit with an 'error' code + throw reason; }); } @@ -247,7 +245,7 @@ const tasksRemindEmail = async (req, res) => { const fromEmails = InstanceManager({ imex: "ImEX Online ", rome: - tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager" + tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager" ? "ProManager " : "Rome Online " }); @@ -283,7 +281,7 @@ const tasksRemindEmail = async (req, res) => { const endPoints = InstanceManager({ imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online", rome: - tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager" + tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager" ? process.env?.NODE_ENV === "test" ? "https//test.promanager.web-est.com" : "https://promanager.web-est.com" diff --git a/server/utils/ioHelpers.js b/server/utils/ioHelpers.js index 2a68925f9..3b3b15adb 100644 --- a/server/utils/ioHelpers.js +++ b/server/utils/ioHelpers.js @@ -1,4 +1,4 @@ -const applyIOHelpers = (app, api, io, logger) => { +const applyIOHelpers = ({ app, api, io, logger }) => { const getBodyshopRoom = (bodyshopID) => `bodyshop-broadcast-room:${bodyshopID}`; const ioHelpersAPI = { @@ -14,4 +14,4 @@ const applyIOHelpers = (app, api, io, logger) => { return ioHelpersAPI; }; -module.exports = applyIOHelpers; +module.exports = { applyIOHelpers }; diff --git a/server/utils/redisHelpers.js b/server/utils/redisHelpers.js index 3ab824be4..54d68773d 100644 --- a/server/utils/redisHelpers.js +++ b/server/utils/redisHelpers.js @@ -4,7 +4,7 @@ * @param app * @param logger */ -const applyRedisHelpers = (pubClient, app, logger) => { +const applyRedisHelpers = ({ pubClient, app, logger }) => { // Store session data in Redis const setSessionData = async (socketId, key, value) => { try { @@ -225,5 +225,5 @@ const applyRedisHelpers = (pubClient, app, logger) => { return api; }; -module.exports = applyRedisHelpers; -// "th1s1sr3d1s" (BCrypt) + +module.exports = { applyRedisHelpers }; diff --git a/server/web-sockets/redisSocketEvents.js b/server/web-sockets/redisSocketEvents.js index 5cda88569..381436c1a 100644 --- a/server/web-sockets/redisSocketEvents.js +++ b/server/web-sockets/redisSocketEvents.js @@ -1,102 +1,18 @@ -const path = require("path"); -require("dotenv").config({ - path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) -}); -const logger = require("../utils/logger"); const { admin } = require("../firebase/firebase-handler"); -// Logging helper functions -function createLogEvent(socket, level, message) { - console.log(`[IOREDIS LOG EVENT] - ${socket.user.email} - ${socket.id} - ${message}`); - logger.log("ioredis-log-event", level, socket.user.email, null, { wsmessage: message }); -} - -const registerUpdateEvents = (socket) => { - socket.on("update-token", async (newToken) => { - try { - socket.user = await admin.auth().verifyIdToken(newToken); - createLogEvent(socket, "INFO", "Token updated successfully"); - socket.emit("token-updated", { success: true }); - } catch (error) { - createLogEvent(socket, "ERROR", `Token update failed: ${error.message}`); - socket.emit("token-updated", { success: false, error: error.message }); - // Optionally disconnect the socket if token verification fails - socket.disconnect(); - } - }); -}; - -const redisSocketEvents = (io, { addUserToRoom, getUsersInRoom, removeUserFromRoom }, { getBodyshopRoom }) => { - // Room management and broadcasting events - function registerRoomAndBroadcastEvents(socket) { - socket.on("join-bodyshop-room", async (bodyshopUUID) => { - try { - const room = getBodyshopRoom(bodyshopUUID); - socket.join(room); - await addUserToRoom(room, { uid: socket.user.uid, email: socket.user.email, socket: socket.id }); - createLogEvent(socket, "DEBUG", `Client joined bodyshop room: ${room}`); - - // Notify all users in the room about the updated user list - const usersInRoom = await getUsersInRoom(room); - io.to(room).emit("room-users-updated", usersInRoom); - } catch (error) { - createLogEvent(socket, "ERROR", `Error joining room: ${error}`); - } - }); - - socket.on("leave-bodyshop-room", async (bodyshopUUID) => { - try { - const room = getBodyshopRoom(bodyshopUUID); - socket.leave(room); - createLogEvent(socket, "DEBUG", `Client left bodyshop room: ${room}`); - } catch (error) { - createLogEvent(socket, "ERROR", `Error joining room: ${error}`); - } - }); - - socket.on("get-room-users", async (bodyshopUUID, callback) => { - try { - const usersInRoom = await getUsersInRoom(getBodyshopRoom(bodyshopUUID)); - callback(usersInRoom); - } catch (error) { - createLogEvent(socket, "ERROR", `Error getting room: ${error}`); - } - }); - - socket.on("broadcast-to-bodyshop", async (bodyshopUUID, message) => { - try { - const room = getBodyshopRoom(bodyshopUUID); - io.to(room).emit("bodyshop-message", message); - createLogEvent(socket, "INFO", `Broadcast message to bodyshop ${room}`); - } catch (error) { - createLogEvent(socket, "ERROR", `Error getting room: ${error}`); - } - }); - - socket.on("disconnect", async () => { - try { - createLogEvent(socket, "DEBUG", `User disconnected.`); - - // Get all rooms the socket is part of - const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id); - for (const room of rooms) { - await removeUserFromRoom(room, { uid: socket.user.uid, email: socket.user.email, socket: socket.id }); - } - } catch (error) { - createLogEvent(socket, "ERROR", `Error getting room: ${error}`); - } - }); - } - - // Register all socket events for a given socket connection - function registerSocketEvents(socket) { - createLogEvent(socket, "DEBUG", `Registering RedisIO Socket Events.`); - - // Register room and broadcasting events - registerRoomAndBroadcastEvents(socket); - registerUpdateEvents(socket); - } +const redisSocketEvents = ({ + io, + redisHelpers: { setSessionData, clearSessionData }, + ioHelpers: { getBodyshopRoom }, + logger +}) => { + // Logging helper functions + const createLogEvent = (socket, level, message) => { + console.log(`[IOREDIS LOG EVENT] - ${socket?.user?.email} - ${socket.id} - ${message}`); + logger.log("ioredis-log-event", level, socket?.user?.email, null, { wsmessage: message }); + }; + // Socket Auth Middleware const authMiddleware = (socket, next) => { try { if (socket.handshake.auth.token) { @@ -105,6 +21,9 @@ const redisSocketEvents = (io, { addUserToRoom, getUsersInRoom, removeUserFromRo .verifyIdToken(socket.handshake.auth.token) .then((user) => { socket.user = user; + return setSessionData(socket.id, "user", user); + }) + .then(() => { next(); }) .catch((error) => { @@ -122,7 +41,99 @@ const redisSocketEvents = (io, { addUserToRoom, getUsersInRoom, removeUserFromRo } }; - // Socket.IO Middleware and Connection + // Register Socket Events + const registerSocketEvents = (socket) => { + createLogEvent(socket, "DEBUG", `Registering RedisIO Socket Events.`); + + // Token Update Events + const registerUpdateEvents = (socket) => { + const updateToken = async (newToken) => { + try { + // noinspection UnnecessaryLocalVariableJS + const user = await admin.auth().verifyIdToken(newToken, true); + socket.user = user; + + // If We ever want to persist user Data across workers + // await setSessionData(socket.id, "user", user); + + createLogEvent(socket, "INFO", "Token updated successfully"); + + socket.emit("token-updated", { success: true }); + } catch (error) { + if (error.code === "auth/id-token-expired") { + createLogEvent(socket, "WARNING", "Stale token received, waiting for new token"); + socket.emit("token-updated", { + success: false, + error: "Stale token." + }); + } else { + createLogEvent(socket, "ERROR", `Token update failed: ${error.message}`); + socket.emit("token-updated", { success: false, error: error.message }); + // For any other errors, optionally disconnect the socket + socket.disconnect(); + } + } + }; + socket.on("update-token", updateToken); + }; + // Room Broadcast Events + const registerRoomAndBroadcastEvents = (socket) => { + const joinBodyshopRoom = (bodyshopUUID) => { + try { + const room = getBodyshopRoom(bodyshopUUID); + socket.join(room); + createLogEvent(socket, "DEBUG", `Client joined bodyshop room: ${room}`); + } catch (error) { + createLogEvent(socket, "ERROR", `Error joining room: ${error}`); + } + }; + + const leaveBodyshopRoom = (bodyshopUUID) => { + try { + const room = getBodyshopRoom(bodyshopUUID); + socket.leave(room); + createLogEvent(socket, "DEBUG", `Client left bodyshop room: ${room}`); + } catch (error) { + createLogEvent(socket, "ERROR", `Error joining room: ${error}`); + } + }; + + const broadcastToBodyshopRoom = (bodyshopUUID, message) => { + try { + const room = getBodyshopRoom(bodyshopUUID); + io.to(room).emit("bodyshop-message", message); + createLogEvent(socket, "DEBUG", `Broadcast message to bodyshop ${room}`); + } catch (error) { + createLogEvent(socket, "ERROR", `Error getting room: ${error}`); + } + }; + + socket.on("join-bodyshop-room", joinBodyshopRoom); + socket.on("leave-bodyshop-room", leaveBodyshopRoom); + socket.on("broadcast-to-bodyshop", broadcastToBodyshopRoom); + }; + // Disconnect Events + const registerDisconnectEvents = (socket) => { + const disconnect = () => { + createLogEvent(socket, "DEBUG", `User disconnected.`); + const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id); + for (const room of rooms) { + socket.leave(room); + } + // If we ever want to persist the user across workers + // clearSessionData(socket.id); + }; + + socket.on("disconnect", disconnect); + }; + + // Call Handlers + registerRoomAndBroadcastEvents(socket); + registerUpdateEvents(socket); + registerDisconnectEvents(socket); + }; + + // Associate Middleware and Handlers io.use(authMiddleware); io.on("connection", registerSocketEvents); };