feature/IO-3586-Socket-Reconnect-Issues - Fix

This commit is contained in:
Dave
2026-03-02 10:41:24 -05:00
parent 88e943f43d
commit 1fa6280876
7 changed files with 258 additions and 52 deletions

View File

@@ -86,6 +86,9 @@ async function FetchSubscriptions({ redisHelpers, socket, jobid, SubscriptionObj
logRequest: false
});
const SubscriptionMeta = subscriptions.data.subscriptions.find((s) => s.subscriptionId === SubscriptionID);
if (!SubscriptionMeta) {
throw new Error(`Subscription metadata not found for SubscriptionID: ${SubscriptionID}`);
}
if (setSessionTransactionData) {
await setSessionTransactionData(
socket.id,
@@ -102,11 +105,15 @@ async function FetchSubscriptions({ redisHelpers, socket, jobid, SubscriptionObj
error: error.message,
stack: error.stack
});
throw error;
}
}
async function GetDepartmentId({ apiName, debug = false, SubscriptionMeta, overrideDepartmentId }) {
if (!apiName) throw new Error("apiName not provided. Unable to get department without apiName.");
if (!SubscriptionMeta || !Array.isArray(SubscriptionMeta.apiDmsInfo)) {
throw new Error("Subscription metadata missing apiDmsInfo.");
}
if (debug) {
console.log("API Names & Departments ");
console.log("===========");
@@ -118,9 +125,8 @@ async function GetDepartmentId({ apiName, debug = false, SubscriptionMeta, overr
.find((info) => info.name === apiName)?.departments; //Departments are categorized by API name and have an array of departments.
if (overrideDepartmentId) {
return departmentIds && departmentIds.find(d => d.id === overrideDepartmentId)?.id
return departmentIds && departmentIds.find((d) => d.id === overrideDepartmentId)?.id;
} else {
return departmentIds && departmentIds[0] && departmentIds[0].id; //TODO: This makes the assumption that there is only 1 department.
}
}

View File

@@ -180,22 +180,52 @@ async function FortellisSelectedCustomer({ socket, redisHelpers, selectedCustome
getTransactionType(jobid),
FortellisCacheEnums.txEnvelope
);
const DMSVid = await redisHelpers.getSessionTransactionData(
socket.id,
getTransactionType(JobData.id),
FortellisCacheEnums.DMSVid
);
if (!JobData || !txEnvelope) {
const friendlyMessage =
"Fortellis export context was lost after reconnect. Click Post again to restart the Fortellis flow.";
CreateFortellisLogEvent(socket, "WARN", friendlyMessage, {
jobid,
hasJobData: !!JobData,
hasTxEnvelope: !!txEnvelope
});
socket.emit("export-failed", {
title: "Fortellis",
severity: "warning",
errorCode: "FORTELLIS_CONTEXT_MISSING",
friendlyMessage
});
return;
}
try {
const DMSVid = await redisHelpers.getSessionTransactionData(
socket.id,
getTransactionType(JobData.id),
FortellisCacheEnums.DMSVid
);
if (!DMSVid) {
const friendlyMessage =
"Fortellis vehicle context is missing after reconnect. Click Post again to restart the Fortellis flow.";
CreateFortellisLogEvent(socket, "WARN", friendlyMessage, {
jobid,
hasDMSVid: !!DMSVid
});
socket.emit("export-failed", {
title: "Fortellis",
severity: "warning",
errorCode: "FORTELLIS_CONTEXT_MISSING",
friendlyMessage
});
return;
}
let DMSCust;
if (selectedCustomerId) {
CreateFortellisLogEvent(socket, "DEBUG", `{3.1} Querying the Customer using Customer ID: ${selectedCustomerId}`);
//Get cust list from Redis. Return the item
const DMSCustList = await getSessionTransactionData(
socket.id,
getTransactionType(jobid),
FortellisCacheEnums.DMSCustList
);
const DMSCustList =
(await getSessionTransactionData(socket.id, getTransactionType(jobid), FortellisCacheEnums.DMSCustList)) || [];
const existingCustomerInDMSCustList = DMSCustList.find((c) => c.customerId === selectedCustomerId);
DMSCust = existingCustomerInDMSCustList || {
customerId: selectedCustomerId //This is the fall back in case it is the generic customer.
@@ -306,7 +336,7 @@ async function FortellisSelectedCustomer({ socket, redisHelpers, selectedCustome
//There was something wrong. Throw an error to trigger clean up.
//throw new Error("Error posting DMS Batch Transaction");
}
} catch (error) {
} catch {
//Clean up the transaction and insert a faild error code
// //Get the error code
CreateFortellisLogEvent(socket, "DEBUG", `{6.1} Getting errors for Transaction ID ${DMSTransHeader.transID}`);
@@ -336,6 +366,12 @@ async function FortellisSelectedCustomer({ socket, redisHelpers, selectedCustome
stack: error.stack,
data: error.errorData
});
socket.emit("export-failed", {
title: "Fortellis",
severity: "error",
error: error.message,
friendlyMessage: "Fortellis export failed. Please click Post again to retry."
});
await InsertFailedExportLog({
socket,
JobData,

View File

@@ -68,12 +68,33 @@ const fetchBodyshopFromDB = async (bodyshopId, logger) => {
* @param logger
*/
const applyRedisHelpers = ({ pubClient, app, logger }) => {
const toRedisJson = (value) => JSON.stringify(value === undefined ? null : value);
// Store session data in Redis
const setSessionData = async (socketId, key, value, ttl) => {
try {
await pubClient.hset(`socket:${socketId}`, key, JSON.stringify(value)); // Use Redis pubClient
const sessionKey = `socket:${socketId}`;
// Supports both forms:
// 1) setSessionData(socketId, "field", value, ttl)
// 2) setSessionData(socketId, { fieldA: valueA, fieldB: valueB }, ttl)
if (key && typeof key === "object" && !Array.isArray(key)) {
const entries = Object.entries(key).flatMap(([field, fieldValue]) => [field, toRedisJson(fieldValue)]);
if (entries.length > 0) {
await pubClient.hset(sessionKey, ...entries);
}
const objectTtl = typeof value === "number" ? value : typeof ttl === "number" ? ttl : null;
if (objectTtl) {
await pubClient.expire(sessionKey, objectTtl);
}
return;
}
await pubClient.hset(sessionKey, key, toRedisJson(value)); // Use Redis pubClient
if (ttl && typeof ttl === "number") {
await pubClient.expire(`socket:${socketId}`, ttl);
await pubClient.expire(sessionKey, ttl);
}
} catch (error) {
logger.log(`Error Setting Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
@@ -88,7 +109,26 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
*/
const getSessionData = async (socketId, key) => {
try {
const data = await pubClient.hget(`socket:${socketId}`, key);
const sessionKey = `socket:${socketId}`;
// Supports:
// 1) getSessionData(socketId, "field") -> parsed field value
// 2) getSessionData(socketId) -> parsed object of all fields
if (typeof key === "undefined") {
const raw = await pubClient.hgetall(sessionKey);
if (!raw || Object.keys(raw).length === 0) return null;
return Object.entries(raw).reduce((acc, [field, rawValue]) => {
try {
acc[field] = JSON.parse(rawValue);
} catch {
acc[field] = rawValue;
}
return acc;
}, {});
}
const data = await pubClient.hget(sessionKey, key);
return data ? JSON.parse(data) : null;
} catch (error) {
logger.log(`Error Getting Session Data for socket ${socketId}: ${error}`, "ERROR", "redis");
@@ -106,7 +146,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
*/
const setSessionTransactionData = async (socketId, transactionType, key, value, ttl) => {
try {
await pubClient.hset(getSocketTransactionkey({ socketId, transactionType }), key, JSON.stringify(value)); // Use Redis pubClient
await pubClient.hset(getSocketTransactionkey({ socketId, transactionType }), key, toRedisJson(value)); // Use Redis pubClient
if (ttl && typeof ttl === "number") {
await pubClient.expire(getSocketTransactionkey({ socketId, transactionType }), ttl);
}
@@ -160,7 +200,17 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
*/
const clearSessionTransactionData = async (socketId, transactionType) => {
try {
await pubClient.del(getSocketTransactionkey({ socketId, transactionType }));
if (transactionType) {
await pubClient.del(getSocketTransactionkey({ socketId, transactionType }));
return;
}
// If no transactionType is provided, clear all transaction namespaces for this socket.
const pattern = getSocketTransactionkey({ socketId, transactionType: "*" });
const keys = await pubClient.keys(pattern);
if (Array.isArray(keys) && keys.length > 0) {
await pubClient.del(...keys);
}
} catch (error) {
logger.log(
`Error Clearing Session Transaction Data for socket ${socketId}:${transactionType}: ${error}`,

View File

@@ -4,11 +4,14 @@ const { FortellisJobExport, FortellisSelectedCustomer } = require("../fortellis/
const CdkCalculateAllocations = require("../cdk/cdk-calculate-allocations").default;
const registerRREvents = require("../rr/rr-register-socket-events");
const SOCKET_SESSION_TTL_SECONDS = 60 * 60 * 24;
const redisSocketEvents = ({ io, redisHelpers, ioHelpers, logger }) => {
// Destructure helpers locally, but keep full objects available for downstream modules
const {
setSessionData,
getSessionData,
clearSessionData,
addUserSocketMapping,
removeUserSocketMapping,
refreshUserSocketTTL,
@@ -51,12 +54,16 @@ const redisSocketEvents = ({ io, redisHelpers, ioHelpers, logger }) => {
}
// NEW: seed a base session for this socket so downstream handlers can read it
await setSessionData(socket.id, {
bodyshopId,
email: user.email,
uid: user.user_id || user.uid,
seededAt: Date.now()
});
await setSessionData(
socket.id,
{
bodyshopId,
email: user.email,
uid: user.user_id || user.uid,
seededAt: Date.now()
},
SOCKET_SESSION_TTL_SECONDS
);
await addUserSocketMapping(user.email, socket.id, bodyshopId);
next();
@@ -126,14 +133,18 @@ const redisSocketEvents = ({ io, redisHelpers, ioHelpers, logger }) => {
}
// NEW: refresh (or create) the base session with the latest info
await setSessionData(socket.id, {
bodyshopId,
email: user.email,
uid: user.user_id || user.uid,
refreshedAt: Date.now()
});
await setSessionData(
socket.id,
{
bodyshopId,
email: user.email,
uid: user.user_id || user.uid,
refreshedAt: Date.now()
},
SOCKET_SESSION_TTL_SECONDS
);
await refreshUserSocketTTL(user.email, bodyshopId);
await refreshUserSocketTTL(user.email);
socket.emit("token-updated", { success: true });
} catch (error) {
if (error.code === "auth/id-token-expired") {
@@ -189,6 +200,11 @@ const redisSocketEvents = ({ io, redisHelpers, ioHelpers, logger }) => {
if (socket.user?.email) {
await removeUserSocketMapping(socket.user.email, socket.id);
}
try {
await clearSessionData(socket.id);
} catch {
//
}
// Optional: clear transactional session
try {
await clearSessionTransactionData(socket.id);