Files
bodyshop/server/web-sockets/redisSocketEvents.js

436 lines
15 KiB
JavaScript

const { admin } = require("../firebase/firebase-handler");
const FortellisLogger = require("../fortellis/fortellis-logger");
const { FortellisJobExport, FortellisSelectedCustomer } = require("../fortellis/fortellis");
const CdkCalculateAllocations = require("../cdk/cdk-calculate-allocations").default;
const registerRREvents = require("../rr/rr-register-socket-events");
const SOCKET_SESSION_TTL_SECONDS = 60 * 60 * 24;
const redisSocketEvents = ({ io, redisHelpers, ioHelpers, logger }) => {
// Destructure helpers locally, but keep full objects available for downstream modules
const {
setSessionData,
getSessionData,
clearSessionData,
addUserSocketMapping,
removeUserSocketMapping,
refreshUserSocketTTL,
getUserSocketMappingByBodyshop,
setSessionTransactionData,
getSessionTransactionData,
clearSessionTransactionData
} = redisHelpers;
const { getBodyshopRoom, getBodyshopConversationRoom } = ioHelpers;
// Logging helper functions
const createLogEvent = (socket, level, message) => {
logger.log("ioredis-log-event", level, socket?.user?.email, null, { wsmessage: message });
};
// Socket Auth Middleware
const authMiddleware = async (socket, next) => {
const { token, bodyshopId } = socket.handshake.auth;
if (!token) {
return next(new Error("Authentication error - no authorization token."));
}
if (!bodyshopId) {
return next(new Error("Authentication error - no bodyshopId provided."));
}
try {
const user = await admin.auth().verifyIdToken(token);
socket.user = user;
socket.bodyshopId = bodyshopId;
socket.data = socket.data || {};
socket.data.authToken = token;
// Used to update legacy sockets
if (socket.handshake?.auth) {
socket.handshake.auth.token = token;
socket.handshake.auth.bodyshopId = bodyshopId;
}
// NEW: seed a base session for this socket so downstream handlers can read it
await setSessionData(
socket.id,
{
bodyshopId,
email: user.email,
uid: user.user_id || user.uid,
seededAt: Date.now()
},
SOCKET_SESSION_TTL_SECONDS
);
await addUserSocketMapping(user.email, socket.id, bodyshopId);
next();
} catch (error) {
next(new Error(`Authentication error: ${error.message}`));
}
};
// Register Socket Events
const registerSocketEvents = (socket) => {
// DMS reset events (clear per-socket DMS transactional cache)
const registerDmsResetEvents = (socket) => {
socket.on("dms-reset-context", async ({ jobId, mode } = {}, ack) => {
try {
// This clears all transactional session data for this socket
// (RR txEnvelope/JobData/SelectedCustomer/PendingRO/etc, Fortellis, etc.)
await clearSessionTransactionData(socket.id);
createLogEvent(
socket,
"debug",
`DMS reset-context: cleared transactional session data` +
(jobId ? ` (jobId=${jobId})` : "") +
(mode ? ` (mode=${mode})` : "")
);
if (typeof ack === "function") {
ack({ ok: true });
}
} catch (error) {
createLogEvent(socket, "error", `DMS reset-context failed: ${error.message}`);
if (typeof ack === "function") {
ack({ ok: false, error: error.message });
}
}
});
};
// Token Update Events
const registerUpdateEvents = (socket) => {
let latestTokenTimestamp = 0;
const updateToken = async ({ token, bodyshopId }) => {
const currentTimestamp = Date.now();
latestTokenTimestamp = currentTimestamp;
if (!token || !bodyshopId) {
socket.emit("token-updated", { success: false, error: "Token or bodyshopId missing" });
return;
}
try {
const user = await admin.auth().verifyIdToken(token);
if (currentTimestamp < latestTokenTimestamp) {
createLogEvent(socket, "warn", "Outdated token validation skipped.");
return;
}
socket.user = user;
socket.bodyshopId = bodyshopId;
// 🔑 keep the live token in a mutable place used by downstream code
socket.data = socket.data || {};
socket.data.authToken = token;
// also keep handshake in sync for any legacy reads
if (socket.handshake?.auth) {
socket.handshake.auth.token = token;
socket.handshake.auth.bodyshopId = bodyshopId;
}
// NEW: refresh (or create) the base session with the latest info
await setSessionData(
socket.id,
{
bodyshopId,
email: user.email,
uid: user.user_id || user.uid,
refreshedAt: Date.now()
},
SOCKET_SESSION_TTL_SECONDS
);
await refreshUserSocketTTL(user.email);
socket.emit("token-updated", { success: true });
} catch (error) {
if (error.code === "auth/id-token-expired") {
createLogEvent(socket, "warn", "Stale token received, waiting for new token");
socket.emit("token-updated", { success: false, error: "Stale token." });
return;
}
createLogEvent(socket, "error", `Token update failed for socket ID: ${socket.id}, Error: ${error.message}`);
socket.emit("token-updated", { success: false, error: error.message });
socket.disconnect();
}
};
socket.on("update-token", updateToken);
};
// Room Broadcast Events
const registerRoomAndBroadcastEvents = (socket) => {
const joinBodyshopRoom = (bodyshopUUID) => {
try {
const room = getBodyshopRoom(bodyshopUUID);
socket.join(room);
} catch (error) {
createLogEvent(socket, "error", `Error joining room: ${error}`);
}
};
const leaveBodyshopRoom = (bodyshopUUID) => {
try {
const room = getBodyshopRoom(bodyshopUUID);
socket.leave(room);
} catch (error) {
createLogEvent(socket, "error", `Error joining room: ${error}`);
}
};
const broadcastToBodyshopRoom = (bodyshopUUID, message) => {
try {
const room = getBodyshopRoom(bodyshopUUID);
io.to(room).emit("bodyshop-message", message);
} catch (error) {
createLogEvent(socket, "error", `Error getting room: ${error}`);
}
};
socket.on("join-bodyshop-room", joinBodyshopRoom);
socket.on("leave-bodyshop-room", leaveBodyshopRoom);
socket.on("broadcast-to-bodyshop", broadcastToBodyshopRoom);
};
// Disconnect Events
const registerDisconnectEvents = (socket) => {
const disconnect = async () => {
if (socket.user?.email) {
await removeUserSocketMapping(socket.user.email, socket.id);
}
try {
await clearSessionData(socket.id);
} catch {
//
}
// Optional: clear transactional session
try {
await clearSessionTransactionData(socket.id);
} catch {
//
}
// Leave all rooms except the default room (socket.id)
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
for (const room of rooms) {
socket.leave(room);
}
};
socket.on("disconnect", disconnect);
};
// Messaging Events
const registerMessagingEvents = (socket) => {
const joinConversationRoom = async ({ bodyshopId, conversationId }) => {
try {
const room = getBodyshopConversationRoom({ bodyshopId, conversationId });
socket.join(room);
} catch (error) {
logger.log("Failed to Join Conversation Room", "error", "io-redis", null, {
bodyshopId,
conversationId,
error: error.message,
stack: error.stack
});
}
};
const leaveConversationRoom = ({ bodyshopId, conversationId }) => {
try {
const room = getBodyshopConversationRoom({ bodyshopId, conversationId });
socket.leave(room);
} catch (error) {
logger.log("Failed to Leave Conversation Room", "error", "io-redis", null, {
bodyshopId,
conversationId,
error: error.message,
stack: error.stack
});
}
};
const conversationModified = ({ bodyshopId, conversationId, ...fields }) => {
try {
// Retrieve the room name for the conversation
const room = getBodyshopRoom(bodyshopId);
// Emit the updated data to all clients in the room
io.to(room).emit("conversation-changed", {
conversationId,
...fields
});
} catch (error) {
logger.log("Failed to handle conversation modification", "error", "io-redis", null, {
bodyshopId,
conversationId,
fields,
error: error.message,
stack: error.stack
});
}
};
socket.on("conversation-modified", conversationModified);
socket.on("join-bodyshop-conversation", joinConversationRoom);
socket.on("leave-bodyshop-conversation", leaveConversationRoom);
};
// Sync Notification Read Events
const registerSyncEvents = (socket) => {
socket.on("sync-notification-read", async ({ email, bodyshopId, notificationId }) => {
try {
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-notification-read", { notificationId, timestamp });
}
});
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing notification read: ${error.message}`);
}
});
socket.on("sync-all-notifications-read", async ({ email, bodyshopId }) => {
try {
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-all-notifications-read", { timestamp });
}
});
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing all notifications read: ${error.message}`);
}
});
};
// Fortellis/CDK Handlers
const registerFortellisEvents = (socket) => {
socket.on("fortellis-export-job", async ({ jobid, txEnvelope }) => {
try {
await FortellisJobExport({
socket,
redisHelpers: {
setSessionData,
getSessionData,
addUserSocketMapping,
removeUserSocketMapping,
refreshUserSocketTTL,
getUserSocketMappingByBodyshop,
setSessionTransactionData,
getSessionTransactionData,
clearSessionTransactionData
},
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
jobid,
txEnvelope
});
} catch (error) {
FortellisLogger(socket, "error", `Error during Fortellis export : ${error.message}`);
logger.log("fortellis-job-export-error", "error", socket.user?.email, jobid, {
message: error.message,
stack: error.stack
});
}
});
socket.on("fortellis-selected-customer", async ({ jobid, selectedCustomerId }) => {
try {
await FortellisSelectedCustomer({
socket,
redisHelpers: {
setSessionData,
getSessionData,
addUserSocketMapping,
removeUserSocketMapping,
refreshUserSocketTTL,
getUserSocketMappingByBodyshop,
setSessionTransactionData,
getSessionTransactionData,
clearSessionTransactionData
},
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
jobid,
selectedCustomerId
});
} catch (error) {
FortellisLogger(socket, "error", `Error during Fortellis export : ${error.message}`);
logger.log("fortellis-selectd-customer-error", "error", socket.user?.email, jobid, {
message: error.message,
stack: error.stack
});
}
});
socket.on("fortellis-calculate-allocations", async (jobid, callback) => {
try {
const allocations = await CdkCalculateAllocations(socket, jobid);
callback(allocations);
} catch (error) {
FortellisLogger(socket, "error", `Error during Fortellis export : ${error.message}`);
logger.log("fortellis-selectd-customer-error", "error", socket.user?.email, jobid, {
message: error.message,
stack: error.stack
});
}
});
};
// Task Events
const registerTaskEvents = (socket) => {
socket.on("task-created", (payload) => {
if (!payload) return;
const room = getBodyshopRoom(socket.bodyshopId);
io.to(room).emit("bodyshop-message", { type: "task-created", payload });
});
socket.on("task-updated", (payload) => {
if (!payload) return;
const room = getBodyshopRoom(socket.bodyshopId);
io.to(room).emit("bodyshop-message", { type: "task-updated", payload });
});
socket.on("task-deleted", (payload) => {
if (!payload?.id) return;
const room = getBodyshopRoom(socket.bodyshopId);
io.to(room).emit("bodyshop-message", { type: "task-deleted", payload });
});
};
// Call Handlers
registerDmsResetEvents(socket);
registerRoomAndBroadcastEvents(socket);
registerUpdateEvents(socket);
registerMessagingEvents(socket);
registerDisconnectEvents(socket);
registerSyncEvents(socket);
registerTaskEvents(socket);
registerFortellisEvents(socket);
// Reynolds & Reynolds socket handlers
registerRREvents({
socket,
redisHelpers
});
};
// Associate Middleware and Handlers
io.use(authMiddleware);
io.on("connection", registerSocketEvents);
};
module.exports = {
redisSocketEvents
};