docker-redis - final cleanup
Signed-off-by: Dave Richer <dave@imexsystems.ca>
This commit is contained in:
@@ -41,4 +41,4 @@ COPY . .
|
|||||||
EXPOSE 4000
|
EXPOSE 4000
|
||||||
|
|
||||||
# Start the application
|
# Start the application
|
||||||
CMD ["nodemon", "--exitcrash" , "--legacy-watch", "server.js"]
|
CMD ["nodemon", "--legacy-watch", "server.js"]
|
||||||
|
|||||||
@@ -1,3 +1,10 @@
|
|||||||
|
#############################
|
||||||
|
# Ports Exposed
|
||||||
|
# 4000 - Imex Node API
|
||||||
|
# 3333 - SocketIO Admin-UI
|
||||||
|
# 3334 - Redis-Insights
|
||||||
|
#############################
|
||||||
|
|
||||||
services:
|
services:
|
||||||
redis-node-1:
|
redis-node-1:
|
||||||
build:
|
build:
|
||||||
|
|||||||
33
server.js
33
server.js
@@ -12,8 +12,8 @@ const { instrument } = require("@socket.io/admin-ui");
|
|||||||
const { isString, isEmpty } = require("lodash");
|
const { isString, isEmpty } = require("lodash");
|
||||||
|
|
||||||
const logger = require("./server/utils/logger");
|
const logger = require("./server/utils/logger");
|
||||||
const applyRedisHelpers = require("./server/utils/redisHelpers");
|
const { applyRedisHelpers } = require("./server/utils/redisHelpers");
|
||||||
const applyIOHelpers = require("./server/utils/ioHelpers");
|
const { applyIOHelpers } = require("./server/utils/ioHelpers");
|
||||||
const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents");
|
const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents");
|
||||||
|
|
||||||
// Load environment variables
|
// Load environment variables
|
||||||
@@ -62,7 +62,7 @@ const SOCKETIO_CORS_ORIGIN_DEV = ["http://localhost:3333", "https://localhost:33
|
|||||||
* Middleware for Express app
|
* Middleware for Express app
|
||||||
* @param app
|
* @param app
|
||||||
*/
|
*/
|
||||||
const applyMiddleware = (app) => {
|
const applyMiddleware = ({ app }) => {
|
||||||
app.use(compression());
|
app.use(compression());
|
||||||
app.use(cookieParser());
|
app.use(cookieParser());
|
||||||
app.use(bodyParser.json({ limit: "50mb" }));
|
app.use(bodyParser.json({ limit: "50mb" }));
|
||||||
@@ -80,7 +80,7 @@ const applyMiddleware = (app) => {
|
|||||||
* Route groupings for Express app
|
* Route groupings for Express app
|
||||||
* @param app
|
* @param app
|
||||||
*/
|
*/
|
||||||
const applyRoutes = (app) => {
|
const applyRoutes = ({ app }) => {
|
||||||
app.use("/", require("./server/routes/miscellaneousRoutes"));
|
app.use("/", require("./server/routes/miscellaneousRoutes"));
|
||||||
app.use("/notifications", require("./server/routes/notificationsRoutes"));
|
app.use("/notifications", require("./server/routes/notificationsRoutes"));
|
||||||
app.use("/render", require("./server/routes/renderRoutes"));
|
app.use("/render", require("./server/routes/renderRoutes"));
|
||||||
@@ -171,7 +171,7 @@ const connectToRedisCluster = () => {
|
|||||||
* @param server
|
* @param server
|
||||||
* @param app
|
* @param app
|
||||||
*/
|
*/
|
||||||
const applySocketIO = async (server, app) => {
|
const applySocketIO = async ({ server, app }) => {
|
||||||
const redisCluster = await connectToRedisCluster();
|
const redisCluster = await connectToRedisCluster();
|
||||||
|
|
||||||
// Handle errors
|
// Handle errors
|
||||||
@@ -182,16 +182,17 @@ const applySocketIO = async (server, app) => {
|
|||||||
const pubClient = redisCluster;
|
const pubClient = redisCluster;
|
||||||
const subClient = pubClient.duplicate();
|
const subClient = pubClient.duplicate();
|
||||||
|
|
||||||
// https://github.com/redis/node-redis/blob/master/docs/clustering.md
|
|
||||||
// https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html
|
|
||||||
|
|
||||||
pubClient.on("error", (err) => logger.log(`Redis pubClient error: ${err}`, "ERROR", "redis"));
|
pubClient.on("error", (err) => logger.log(`Redis pubClient error: ${err}`, "ERROR", "redis"));
|
||||||
subClient.on("error", (err) => logger.log(`Redis subClient error: ${err}`, "ERROR", "redis"));
|
subClient.on("error", (err) => logger.log(`Redis subClient error: ${err}`, "ERROR", "redis"));
|
||||||
|
|
||||||
process.on("SIGINT", async () => {
|
process.on("SIGINT", async () => {
|
||||||
logger.log("Closing Redis connections...", "INFO", "redis", "api");
|
logger.log("Closing Redis connections...", "INFO", "redis", "api");
|
||||||
await Promise.all([pubClient.disconnect(), subClient.disconnect()]);
|
try {
|
||||||
process.exit(0);
|
await Promise.all([pubClient.disconnect(), subClient.disconnect()]);
|
||||||
|
logger.log("Redis connections closed. Process will exit.", "INFO", "redis", "api");
|
||||||
|
} catch (error) {
|
||||||
|
logger.log(`Error closing Redis connections: ${error.message}`, "ERROR", "redis", "api");
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const ioRedis = new Server(server, {
|
const ioRedis = new Server(server, {
|
||||||
@@ -259,16 +260,16 @@ const main = async () => {
|
|||||||
|
|
||||||
const server = http.createServer(app);
|
const server = http.createServer(app);
|
||||||
|
|
||||||
const { pubClient, ioRedis } = await applySocketIO(server, app);
|
const { pubClient, ioRedis } = await applySocketIO({ server, app });
|
||||||
const api = applyRedisHelpers(pubClient, app, logger);
|
const redisHelpers = applyRedisHelpers({ pubClient, app, logger });
|
||||||
const ioHelpers = applyIOHelpers(app, api, ioRedis, logger);
|
const ioHelpers = applyIOHelpers({ app, redisHelpers, ioRedis, logger });
|
||||||
|
|
||||||
// Legacy Socket Events
|
// Legacy Socket Events
|
||||||
require("./server/web-sockets/web-socket");
|
require("./server/web-sockets/web-socket");
|
||||||
|
|
||||||
applyMiddleware(app);
|
applyMiddleware({ app });
|
||||||
applyRoutes(app);
|
applyRoutes({ app });
|
||||||
redisSocketEvents(ioRedis, api, ioHelpers);
|
redisSocketEvents({ io: ioRedis, redisHelpers, ioHelpers, logger });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await server.listen(port);
|
await server.listen(port);
|
||||||
|
|||||||
@@ -45,22 +45,20 @@ if (process.env.NODE_ENV !== "development") {
|
|||||||
// Handling SIGINT (e.g., Ctrl+C)
|
// Handling SIGINT (e.g., Ctrl+C)
|
||||||
process.on("SIGINT", async () => {
|
process.on("SIGINT", async () => {
|
||||||
await tasksEmailQueueCleanup();
|
await tasksEmailQueueCleanup();
|
||||||
process.exit(0);
|
|
||||||
});
|
});
|
||||||
// Handling SIGTERM (e.g., sent by system shutdown)
|
// Handling SIGTERM (e.g., sent by system shutdown)
|
||||||
process.on("SIGTERM", async () => {
|
process.on("SIGTERM", async () => {
|
||||||
await tasksEmailQueueCleanup();
|
await tasksEmailQueueCleanup();
|
||||||
process.exit(0);
|
|
||||||
});
|
});
|
||||||
// Handling uncaught exceptions
|
// Handling uncaught exceptions
|
||||||
process.on("uncaughtException", async (err) => {
|
process.on("uncaughtException", async (err) => {
|
||||||
await tasksEmailQueueCleanup();
|
await tasksEmailQueueCleanup();
|
||||||
process.exit(1); // Exit with an 'error' code
|
throw err;
|
||||||
});
|
});
|
||||||
// Handling unhandled promise rejections
|
// Handling unhandled promise rejections
|
||||||
process.on("unhandledRejection", async (reason, promise) => {
|
process.on("unhandledRejection", async (reason, promise) => {
|
||||||
await tasksEmailQueueCleanup();
|
await tasksEmailQueueCleanup();
|
||||||
process.exit(1); // Exit with an 'error' code
|
throw reason;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -247,7 +245,7 @@ const tasksRemindEmail = async (req, res) => {
|
|||||||
const fromEmails = InstanceManager({
|
const fromEmails = InstanceManager({
|
||||||
imex: "ImEX Online <noreply@imex.online>",
|
imex: "ImEX Online <noreply@imex.online>",
|
||||||
rome:
|
rome:
|
||||||
tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager"
|
tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager"
|
||||||
? "ProManager <noreply@promanager.web-est.com>"
|
? "ProManager <noreply@promanager.web-est.com>"
|
||||||
: "Rome Online <noreply@romeonline.io>"
|
: "Rome Online <noreply@romeonline.io>"
|
||||||
});
|
});
|
||||||
@@ -283,7 +281,7 @@ const tasksRemindEmail = async (req, res) => {
|
|||||||
const endPoints = InstanceManager({
|
const endPoints = InstanceManager({
|
||||||
imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online",
|
imex: process.env?.NODE_ENV === "test" ? "https://test.imex.online" : "https://imex.online",
|
||||||
rome:
|
rome:
|
||||||
tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager"
|
tasksRequest?.tasks[0].bodyshop.convenient_company === "promanager"
|
||||||
? process.env?.NODE_ENV === "test"
|
? process.env?.NODE_ENV === "test"
|
||||||
? "https//test.promanager.web-est.com"
|
? "https//test.promanager.web-est.com"
|
||||||
: "https://promanager.web-est.com"
|
: "https://promanager.web-est.com"
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
const applyIOHelpers = (app, api, io, logger) => {
|
const applyIOHelpers = ({ app, api, io, logger }) => {
|
||||||
const getBodyshopRoom = (bodyshopID) => `bodyshop-broadcast-room:${bodyshopID}`;
|
const getBodyshopRoom = (bodyshopID) => `bodyshop-broadcast-room:${bodyshopID}`;
|
||||||
|
|
||||||
const ioHelpersAPI = {
|
const ioHelpersAPI = {
|
||||||
@@ -14,4 +14,4 @@ const applyIOHelpers = (app, api, io, logger) => {
|
|||||||
return ioHelpersAPI;
|
return ioHelpersAPI;
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = applyIOHelpers;
|
module.exports = { applyIOHelpers };
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
* @param app
|
* @param app
|
||||||
* @param logger
|
* @param logger
|
||||||
*/
|
*/
|
||||||
const applyRedisHelpers = (pubClient, app, logger) => {
|
const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
||||||
// Store session data in Redis
|
// Store session data in Redis
|
||||||
const setSessionData = async (socketId, key, value) => {
|
const setSessionData = async (socketId, key, value) => {
|
||||||
try {
|
try {
|
||||||
@@ -225,5 +225,5 @@ const applyRedisHelpers = (pubClient, app, logger) => {
|
|||||||
|
|
||||||
return api;
|
return api;
|
||||||
};
|
};
|
||||||
module.exports = applyRedisHelpers;
|
|
||||||
// "th1s1sr3d1s" (BCrypt)
|
module.exports = { applyRedisHelpers };
|
||||||
|
|||||||
@@ -1,102 +1,18 @@
|
|||||||
const path = require("path");
|
|
||||||
require("dotenv").config({
|
|
||||||
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
|
|
||||||
});
|
|
||||||
const logger = require("../utils/logger");
|
|
||||||
const { admin } = require("../firebase/firebase-handler");
|
const { admin } = require("../firebase/firebase-handler");
|
||||||
|
|
||||||
// Logging helper functions
|
const redisSocketEvents = ({
|
||||||
function createLogEvent(socket, level, message) {
|
io,
|
||||||
console.log(`[IOREDIS LOG EVENT] - ${socket.user.email} - ${socket.id} - ${message}`);
|
redisHelpers: { setSessionData, clearSessionData },
|
||||||
logger.log("ioredis-log-event", level, socket.user.email, null, { wsmessage: message });
|
ioHelpers: { getBodyshopRoom },
|
||||||
}
|
logger
|
||||||
|
}) => {
|
||||||
const registerUpdateEvents = (socket) => {
|
// Logging helper functions
|
||||||
socket.on("update-token", async (newToken) => {
|
const createLogEvent = (socket, level, message) => {
|
||||||
try {
|
console.log(`[IOREDIS LOG EVENT] - ${socket?.user?.email} - ${socket.id} - ${message}`);
|
||||||
socket.user = await admin.auth().verifyIdToken(newToken);
|
logger.log("ioredis-log-event", level, socket?.user?.email, null, { wsmessage: message });
|
||||||
createLogEvent(socket, "INFO", "Token updated successfully");
|
};
|
||||||
socket.emit("token-updated", { success: true });
|
|
||||||
} catch (error) {
|
|
||||||
createLogEvent(socket, "ERROR", `Token update failed: ${error.message}`);
|
|
||||||
socket.emit("token-updated", { success: false, error: error.message });
|
|
||||||
// Optionally disconnect the socket if token verification fails
|
|
||||||
socket.disconnect();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
const redisSocketEvents = (io, { addUserToRoom, getUsersInRoom, removeUserFromRoom }, { getBodyshopRoom }) => {
|
|
||||||
// Room management and broadcasting events
|
|
||||||
function registerRoomAndBroadcastEvents(socket) {
|
|
||||||
socket.on("join-bodyshop-room", async (bodyshopUUID) => {
|
|
||||||
try {
|
|
||||||
const room = getBodyshopRoom(bodyshopUUID);
|
|
||||||
socket.join(room);
|
|
||||||
await addUserToRoom(room, { uid: socket.user.uid, email: socket.user.email, socket: socket.id });
|
|
||||||
createLogEvent(socket, "DEBUG", `Client joined bodyshop room: ${room}`);
|
|
||||||
|
|
||||||
// Notify all users in the room about the updated user list
|
|
||||||
const usersInRoom = await getUsersInRoom(room);
|
|
||||||
io.to(room).emit("room-users-updated", usersInRoom);
|
|
||||||
} catch (error) {
|
|
||||||
createLogEvent(socket, "ERROR", `Error joining room: ${error}`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
socket.on("leave-bodyshop-room", async (bodyshopUUID) => {
|
|
||||||
try {
|
|
||||||
const room = getBodyshopRoom(bodyshopUUID);
|
|
||||||
socket.leave(room);
|
|
||||||
createLogEvent(socket, "DEBUG", `Client left bodyshop room: ${room}`);
|
|
||||||
} catch (error) {
|
|
||||||
createLogEvent(socket, "ERROR", `Error joining room: ${error}`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
socket.on("get-room-users", async (bodyshopUUID, callback) => {
|
|
||||||
try {
|
|
||||||
const usersInRoom = await getUsersInRoom(getBodyshopRoom(bodyshopUUID));
|
|
||||||
callback(usersInRoom);
|
|
||||||
} catch (error) {
|
|
||||||
createLogEvent(socket, "ERROR", `Error getting room: ${error}`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
socket.on("broadcast-to-bodyshop", async (bodyshopUUID, message) => {
|
|
||||||
try {
|
|
||||||
const room = getBodyshopRoom(bodyshopUUID);
|
|
||||||
io.to(room).emit("bodyshop-message", message);
|
|
||||||
createLogEvent(socket, "INFO", `Broadcast message to bodyshop ${room}`);
|
|
||||||
} catch (error) {
|
|
||||||
createLogEvent(socket, "ERROR", `Error getting room: ${error}`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
socket.on("disconnect", async () => {
|
|
||||||
try {
|
|
||||||
createLogEvent(socket, "DEBUG", `User disconnected.`);
|
|
||||||
|
|
||||||
// Get all rooms the socket is part of
|
|
||||||
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
|
|
||||||
for (const room of rooms) {
|
|
||||||
await removeUserFromRoom(room, { uid: socket.user.uid, email: socket.user.email, socket: socket.id });
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
createLogEvent(socket, "ERROR", `Error getting room: ${error}`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register all socket events for a given socket connection
|
|
||||||
function registerSocketEvents(socket) {
|
|
||||||
createLogEvent(socket, "DEBUG", `Registering RedisIO Socket Events.`);
|
|
||||||
|
|
||||||
// Register room and broadcasting events
|
|
||||||
registerRoomAndBroadcastEvents(socket);
|
|
||||||
registerUpdateEvents(socket);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Socket Auth Middleware
|
||||||
const authMiddleware = (socket, next) => {
|
const authMiddleware = (socket, next) => {
|
||||||
try {
|
try {
|
||||||
if (socket.handshake.auth.token) {
|
if (socket.handshake.auth.token) {
|
||||||
@@ -105,6 +21,9 @@ const redisSocketEvents = (io, { addUserToRoom, getUsersInRoom, removeUserFromRo
|
|||||||
.verifyIdToken(socket.handshake.auth.token)
|
.verifyIdToken(socket.handshake.auth.token)
|
||||||
.then((user) => {
|
.then((user) => {
|
||||||
socket.user = user;
|
socket.user = user;
|
||||||
|
return setSessionData(socket.id, "user", user);
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
next();
|
next();
|
||||||
})
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
@@ -122,7 +41,99 @@ const redisSocketEvents = (io, { addUserToRoom, getUsersInRoom, removeUserFromRo
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Socket.IO Middleware and Connection
|
// Register Socket Events
|
||||||
|
const registerSocketEvents = (socket) => {
|
||||||
|
createLogEvent(socket, "DEBUG", `Registering RedisIO Socket Events.`);
|
||||||
|
|
||||||
|
// Token Update Events
|
||||||
|
const registerUpdateEvents = (socket) => {
|
||||||
|
const updateToken = async (newToken) => {
|
||||||
|
try {
|
||||||
|
// noinspection UnnecessaryLocalVariableJS
|
||||||
|
const user = await admin.auth().verifyIdToken(newToken, true);
|
||||||
|
socket.user = user;
|
||||||
|
|
||||||
|
// If We ever want to persist user Data across workers
|
||||||
|
// await setSessionData(socket.id, "user", user);
|
||||||
|
|
||||||
|
createLogEvent(socket, "INFO", "Token updated successfully");
|
||||||
|
|
||||||
|
socket.emit("token-updated", { success: true });
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === "auth/id-token-expired") {
|
||||||
|
createLogEvent(socket, "WARNING", "Stale token received, waiting for new token");
|
||||||
|
socket.emit("token-updated", {
|
||||||
|
success: false,
|
||||||
|
error: "Stale token."
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
createLogEvent(socket, "ERROR", `Token update failed: ${error.message}`);
|
||||||
|
socket.emit("token-updated", { success: false, error: error.message });
|
||||||
|
// For any other errors, optionally disconnect the socket
|
||||||
|
socket.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
socket.on("update-token", updateToken);
|
||||||
|
};
|
||||||
|
// Room Broadcast Events
|
||||||
|
const registerRoomAndBroadcastEvents = (socket) => {
|
||||||
|
const joinBodyshopRoom = (bodyshopUUID) => {
|
||||||
|
try {
|
||||||
|
const room = getBodyshopRoom(bodyshopUUID);
|
||||||
|
socket.join(room);
|
||||||
|
createLogEvent(socket, "DEBUG", `Client joined bodyshop room: ${room}`);
|
||||||
|
} catch (error) {
|
||||||
|
createLogEvent(socket, "ERROR", `Error joining room: ${error}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const leaveBodyshopRoom = (bodyshopUUID) => {
|
||||||
|
try {
|
||||||
|
const room = getBodyshopRoom(bodyshopUUID);
|
||||||
|
socket.leave(room);
|
||||||
|
createLogEvent(socket, "DEBUG", `Client left bodyshop room: ${room}`);
|
||||||
|
} catch (error) {
|
||||||
|
createLogEvent(socket, "ERROR", `Error joining room: ${error}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const broadcastToBodyshopRoom = (bodyshopUUID, message) => {
|
||||||
|
try {
|
||||||
|
const room = getBodyshopRoom(bodyshopUUID);
|
||||||
|
io.to(room).emit("bodyshop-message", message);
|
||||||
|
createLogEvent(socket, "DEBUG", `Broadcast message to bodyshop ${room}`);
|
||||||
|
} catch (error) {
|
||||||
|
createLogEvent(socket, "ERROR", `Error getting room: ${error}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
socket.on("join-bodyshop-room", joinBodyshopRoom);
|
||||||
|
socket.on("leave-bodyshop-room", leaveBodyshopRoom);
|
||||||
|
socket.on("broadcast-to-bodyshop", broadcastToBodyshopRoom);
|
||||||
|
};
|
||||||
|
// Disconnect Events
|
||||||
|
const registerDisconnectEvents = (socket) => {
|
||||||
|
const disconnect = () => {
|
||||||
|
createLogEvent(socket, "DEBUG", `User disconnected.`);
|
||||||
|
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
|
||||||
|
for (const room of rooms) {
|
||||||
|
socket.leave(room);
|
||||||
|
}
|
||||||
|
// If we ever want to persist the user across workers
|
||||||
|
// clearSessionData(socket.id);
|
||||||
|
};
|
||||||
|
|
||||||
|
socket.on("disconnect", disconnect);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Call Handlers
|
||||||
|
registerRoomAndBroadcastEvents(socket);
|
||||||
|
registerUpdateEvents(socket);
|
||||||
|
registerDisconnectEvents(socket);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Associate Middleware and Handlers
|
||||||
io.use(authMiddleware);
|
io.use(authMiddleware);
|
||||||
io.on("connection", registerSocketEvents);
|
io.on("connection", registerSocketEvents);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user