diff --git a/server.js b/server.js index f0897c3f5..d0efb579a 100644 --- a/server.js +++ b/server.js @@ -157,19 +157,102 @@ const main = async () => { await pubClient.del(`socket:${socketId}`); }; - // TODO: Remove, just a demo of hGet and hSet - // async function demoSessionData() { + // Store multiple session data in Redis + exports.setMultipleSessionData = async (socketId, keyValues) => { + // keyValues is expected to be an object { key1: value1, key2: value2, ... } + const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]); + await pubClient.hSet(`socket:${socketId}`, ...entries.flat()); + }; + + // Retrieve multiple session data from Redis + exports.getMultipleSessionData = async (socketId, keys) => { + const data = await pubClient.hmGet(`socket:${socketId}`, keys); + // Redis returns an object with null values for missing keys, so we parse the non-null ones + return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null])); + }; + + exports.setMultipleFromArraySessionData = async (socketId, keyValueArray) => { + // Use Redis multi/pipeline to batch the commands + const multi = pubClient.multi(); + + keyValueArray.forEach(([key, value]) => { + multi.hSet(`socket:${socketId}`, key, JSON.stringify(value)); + }); + + await multi.exec(); // Execute all queued commands + }; + + // Helper function to add an item to the end of the Redis list + exports.addItemToEndOfList = async (socketId, key, newItem) => { + try { + await pubClient.rPush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); + } catch (error) { + console.error(`Error adding item to the end of the list for socket ${socketId}:`, error); + } + }; + + // Helper function to add an item to the beginning of the Redis list + exports.addItemToBeginningOfList = async (socketId, key, newItem) => { + try { + await pubClient.lPush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); + } catch (error) { + console.error(`Error adding item to the beginning of the list for socket ${socketId}:`, error); + } + }; + + // // Demo to show how all the helper functions work + // const demoSessionData = async () => { + // const socketId = "testSocketId"; + // // // Store session data using setSessionData - // await exports.setSessionData("testSocketId", "field1", "Hello, Redis Hash!"); + // await exports.setSessionData(socketId, "field1", "Hello, Redis!"); // // // Retrieve session data using getSessionData - // const value = await exports.getSessionData("testSocketId", "field1"); + // const field1Value = await exports.getSessionData(socketId, "field1"); + // console.log("Retrieved single field value:", field1Value); // - // // Output the retrieved value - // console.log(value); + // // Store multiple session data using setMultipleSessionData + // await exports.setMultipleSessionData(socketId, { field2: "Second Value", field3: "Third Value" }); + // + // // Retrieve multiple session data using getMultipleSessionData + // const multipleFields = await exports.getMultipleSessionData(socketId, ["field2", "field3"]); + // console.log("Retrieved multiple field values:", multipleFields); + // + // // Store multiple session data using setMultipleFromArraySessionData + // await exports.setMultipleFromArraySessionData(socketId, [ + // ["field4", "Fourth Value"], + // ["field5", "Fifth Value"] + // ]); + // + // // Retrieve and log all fields + // const allFields = await exports.getMultipleSessionData(socketId, [ + // "field1", + // "field2", + // "field3", + // "field4", + // "field5" + // ]); + // console.log("Retrieved all field values:", allFields); + // + // // Add item to the end of a Redis list + // await exports.addItemToEndOfList(socketId, "logEvents", { event: "Log Event 1", timestamp: new Date() }); + // await exports.addItemToEndOfList(socketId, "logEvents", { event: "Log Event 2", timestamp: new Date() }); + // + // // Add item to the beginning of a Redis list + // await exports.addItemToBeginningOfList(socketId, "logEvents", { event: "First Log Event", timestamp: new Date() }); + // + // // Retrieve the entire list (using lRange) + // const logEvents = await pubClient.lRange(`socket:${socketId}:logEvents`, 0, -1); + // console.log("Log Events List:", logEvents.map(JSON.parse)); + // + // // Clear session data + // await exports.clearSessionData(socketId); + // console.log("Session data cleared."); + // }; + // + // if (process.env.NODE_ENV === "development") { + // await demoSessionData(); // } - // demoSessionData().catch(console.error); - // TODO: End of demo require("./server/web-sockets/web-socket"); diff --git a/server/accounting/pbs/pbs-ap-allocations.js b/server/accounting/pbs/pbs-ap-allocations.js index e2212cae5..9f3dde43e 100644 --- a/server/accounting/pbs/pbs-ap-allocations.js +++ b/server/accounting/pbs/pbs-ap-allocations.js @@ -12,7 +12,7 @@ const AxiosLib = require("axios").default; const axios = AxiosLib.create(); const { PBS_ENDPOINTS, PBS_CREDENTIALS } = require("./pbs-constants"); const { CheckForErrors } = require("./pbs-job-export"); -const { getSessionData, setSessionData } = require("../../../server"); +const { getSessionData, getMultipleSessionData, setMultipleSessionData } = require("../../../server"); const uuid = require("uuid").v4; axios.interceptors.request.use( @@ -66,8 +66,10 @@ async function PbsCalculateAllocationsAp(socket, billids) { const { bills, bodyshops } = await QueryBillData(socket, billids); const bodyshop = bodyshops[0]; - await setSessionData(socket.id, "bills", bills); - await setSessionData(socket.id, "bodyshop", bodyshop); + await setMultipleSessionData(socket.id, { + bills, + bodyshop + }); const txEnvelope = await getSessionData(socket.id, "txEnvelope"); @@ -221,12 +223,12 @@ function getCostAccount(billline, respcenters) { exports.PbsExportAp = async function (socket, { billids, txEnvelope }) { await CdkBase.createLogEvent(socket, "DEBUG", `Exporting selected AP.`); - //apAllocations has the same shape as the lines key for the accounting posting to PBS. const apAllocations = await PbsCalculateAllocationsAp(socket, billids); - await setSessionData(socket.id, "apAllocations", apAllocations); - // Store txEnvelope in session - await setSessionData(socket.id, "txEnvelope", txEnvelope); + await setMultipleSessionData(socket.id, { + apAllocations, + txEnvelope + }); for (const allocation of apAllocations) { const { billid, ...restAllocation } = allocation; @@ -257,8 +259,7 @@ exports.PbsExportAp = async function (socket, { billids, txEnvelope }) { }; async function MarkApExported(socket, billids) { - const bills = await getSessionData(socket.id, "bills"); - const bodyshop = await getSessionData(socket.id, "bodyshop"); + const { bills, bodyshop } = await getMultipleSessionData(socket.id, ["bills", "bodyshop"]); await CdkBase.createLogEvent(socket, "DEBUG", `Marking bills as exported for id ${billids}`); const client = new GraphQLClient(process.env.GRAPHQL_ENDPOINT, {}); diff --git a/server/accounting/pbs/pbs-job-export.js b/server/accounting/pbs/pbs-job-export.js index 5ec38982f..01f3060d6 100644 --- a/server/accounting/pbs/pbs-job-export.js +++ b/server/accounting/pbs/pbs-job-export.js @@ -12,7 +12,7 @@ const CalculateAllocations = require("../../cdk/cdk-calculate-allocations").defa const CdkBase = require("../../web-sockets/web-socket"); const moment = require("moment-timezone"); const Dinero = require("dinero.js"); -const { setSessionData, getSessionData } = require("../../../server"); +const { setSessionData, getSessionData, getMultipleSessionData, setMultipleSessionData } = require("../../../server"); const InstanceManager = require("../../utils/instanceMgr").default; const axios = AxiosLib.create(); @@ -60,9 +60,10 @@ axios.interceptors.response.use( exports.default = async function (socket, { txEnvelope, jobid }) { try { - await setSessionData(socket.id, "logEvents", []); - await setSessionData(socket.id, "recordid", jobid); - await setSessionData(socket.id, "txEnvelope", txEnvelope); + await setMultipleSessionData(socket.id, { + recordid: jobid, + txEnvelope + }); await CdkBase.createLogEvent(socket, "DEBUG", `Received Job export request for id ${jobid}`); @@ -536,8 +537,7 @@ async function UpsertVehicleData(socket, ownerRef) { async function InsertAccountPostingData(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, txEnvelope } = await getMultipleSessionData(socket.id, ["JobData", "txEnvelope"]); const allocations = await CalculateAllocations(socket, JobData.id); @@ -650,8 +650,9 @@ async function InsertAccountPostingData(socket) { async function MarkJobExported(socket, jobid) { await CdkBase.createLogEvent(socket, "DEBUG", `Marking job as exported for id ${jobid}`); const client = new GraphQLClient(process.env.GRAPHQL_ENDPOINT, {}); - const JobData = await getSessionData(socket.id, "JobData"); - const transWips = await getSessionData(socket.id, "transWips"); + + const { JobData, transWips } = await getMultipleSessionData(socket.id, ["JobData", "transWips"]); + return await client .setHeaders({ Authorization: `Bearer ${socket.handshake.auth.token}` }) .request(queries.MARK_JOB_EXPORTED, { diff --git a/server/cdk/cdk-job-export.js b/server/cdk/cdk-job-export.js index 592debf4c..b9d58cfda 100644 --- a/server/cdk/cdk-job-export.js +++ b/server/cdk/cdk-job-export.js @@ -12,14 +12,15 @@ const CalcualteAllocations = require("./cdk-calculate-allocations").default; const InstanceMgr = require("../utils/instanceMgr").default; const moment = require("moment-timezone"); -const { setSessionData, getSessionData } = require("../../server"); +const { setSessionData, getSessionData, getMultipleSessionData, setMultipleSessionData } = require("../../server"); const replaceSpecialRegex = /[^a-zA-Z0-9 .,\n #]+/g; exports.default = async function (socket, { txEnvelope, jobid }) { - await setSessionData(socket.id, "logEvents", []); - await setSessionData(socket.id, "recordid", jobid); - await setSessionData(socket.id, "txEnvelope", txEnvelope); + await setMultipleSessionData(socket.id, { + recordid: jobid, + txEnvelope + }); try { await CdkBase.createLogEvent(socket, "DEBUG", `Received Job export request for id ${jobid}`); @@ -473,10 +474,12 @@ async function InsertDmsCustomer(socket, newCustomerNumber) { async function InsertDmsVehicle(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); - const DMSVid = await getSessionData(socket.id, "DMSVid"); - const DMSCust = await getSessionData(socket.id, "DMSCust"); + const { JobData, txEnvelope, DMSVid, DMSCust } = await getMultipleSessionData(socket.id, [ + "JobData", + "txEnvelope", + "DMSVid", + "DMSCust" + ]); const soapClientVehicleInsertUpdate = await soap.createClientAsync(CdkWsdl.VehicleInsertUpdate); @@ -554,11 +557,13 @@ async function InsertDmsVehicle(socket) { async function UpdateDmsVehicle(socket) { try { const soapClientVehicleInsertUpdate = await soap.createClientAsync(CdkWsdl.VehicleInsertUpdate); - const JobData = await getSessionData(socket.id, "JobData"); - const DMSVeh = await getSessionData(socket.id, "DMSVeh"); - const DMSCust = await getSessionData(socket.id, "DMSCust"); - const selectedCustomerId = await getSessionData(socket.id, "selectedCustomerId"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, DMSVeh, DMSCust, selectedCustomerId, txEnvelope } = await getMultipleSessionData(socket.id, [ + "JobData", + "DMSVeh", + "DMSCust", + "selectedCustomerId", + "txEnvelope" + ]); let ids = []; @@ -660,9 +665,11 @@ async function UpdateDmsVehicle(socket) { async function InsertServiceVehicleHistory(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSVid = await getSessionData(socket.id, "DMSVid"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, DMSVid, txEnvelope } = await getMultipleSessionData(socket.id, [ + "JobData", + "DMSVid", + "txEnvelope" + ]); const soapClientServiceHistoryInsert = await soap.createClientAsync(CdkWsdl.ServiceHistoryInsert); @@ -713,8 +720,7 @@ async function InsertServiceVehicleHistory(socket) { async function InsertDmsStartWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, txEnvelope } = await getMultipleSessionData(socket.id, ["JobData", "txEnvelope"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -799,9 +805,11 @@ async function InsertDmsBatchWip(socket) { } async function GenerateTransWips(socket) { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, DMSTransHeader, txEnvelope } = await getMultipleSessionData(socket.id, [ + "JobData", + "DMSTransHeader", + "txEnvelope" + ]); const allocations = await CalcualteAllocations(socket, JobData.id); const wips = []; @@ -921,8 +929,7 @@ async function GenerateTransWips(socket) { async function PostDmsBatchWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); + const { JobData, DMSTransHeader } = await getMultipleSessionData(socket.id, ["JobData", "DMSTransHeader"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -959,8 +966,7 @@ async function PostDmsBatchWip(socket) { async function QueryDmsErrWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); + const { JobData, DMSTransHeader } = await getMultipleSessionData(socket.id, ["JobData", "DMSTransHeader"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -993,8 +999,7 @@ async function QueryDmsErrWip(socket) { async function DeleteDmsWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); + const { JobData, DMSTransHeader } = await getMultipleSessionData(socket.id, ["JobData", "DMSTransHeader"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -1032,8 +1037,7 @@ async function DeleteDmsWip(socket) { async function MarkJobExported(socket, jobid) { await CdkBase.createLogEvent(socket, "DEBUG", `Marking job as exported for id ${jobid}`); - const JobData = await getSessionData(socket.id, "JobData"); - const transWips = (await getSessionData(socket.id, "transWips")) || []; + const { JobData, transWips = [] } = await getMultipleSessionData(socket.id, ["JobData", "transWips"]); const client = new GraphQLClient(process.env.GRAPHQL_ENDPOINT, {}); diff --git a/server/web-sockets/web-socket.js b/server/web-sockets/web-socket.js index c2dd7c073..708398829 100644 --- a/server/web-sockets/web-socket.js +++ b/server/web-sockets/web-socket.js @@ -3,7 +3,8 @@ require("dotenv").config({ path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); -const { io, setSessionData, clearSessionData, getSessionData } = require("../../server"); +const { io, setSessionData, clearSessionData, getMultipleSessionData, addItemToEndOfList } = require("../../server"); + const { admin } = require("../firebase/firebase-handler"); const logger = require("../utils/logger"); const { default: CdkJobExport, CdkSelectedCustomer } = require("../cdk/cdk-job-export"); @@ -117,7 +118,10 @@ function registerPbsApEvents(socket) { socket.on("pbs-export-ap", async ({ billids, txEnvelope }) => { await setSessionData(socket.id, "pbs_txEnvelope", txEnvelope); - PbsExportAp(socket, { billids, txEnvelope }).catch((err) => console.error(`Error in pbs-export-ap: ${err}`)); + PbsExportAp(socket, { + billids, + txEnvelope + }).catch((err) => console.error(`Error in pbs-export-ap: ${err}`)); }); } @@ -149,76 +153,67 @@ function registerDmsClearSessionEvent(socket) { // Logging helper functions async function createLogEvent(socket, level, message) { - const logLevel = await getSessionData(socket.id, "log_level"); - const recordid = await getSessionData(socket.id, "recordid"); - const logEvents = await getSessionData(socket.id, "logEvents"); + const { logLevel, recordid } = await getMultipleSessionData(socket.id, ["log_level", "recordid"]); if (LogLevelHierarchy(logLevel) >= LogLevelHierarchy(level)) { - console.log(`[WS LOG EVENT] ${level} - ${new Date()} - ${socket.user.email} - ${socket.id} - ${message}`); - socket.emit("log-event", { timestamp: new Date(), level, message }); + const logMessage = { timestamp: new Date(), level, message }; + + // Log the message to the console and emit it via the socket + console.log(`[WS LOG EVENT] ${level} - ${logMessage.timestamp} - ${socket.user.email} - ${socket.id} - ${message}`); + socket.emit("log-event", logMessage); + + // Log the event via the logger logger.log("ws-log-event", level, socket.user.email, recordid, { wsmessage: message }); - if (logEvents && Array.isArray(logEvents)) { - logEvents.push({ timestamp: new Date(), level, message }); - setSessionData(socket.id, "logEvents", logEvents).catch((err) => - console.error(`Error setting logEvents in Redis: ${err}`) - ); - } + // Add the log message to the Redis list using the helper function + await addItemToEndOfList(socket.id, "logEvents", logMessage); } } async function createJsonEvent(socket, level, message, json) { - const logEvents = await getSessionData(socket.id, "logEvents"); - const logLevel = await getSessionData(socket.id, "log_level"); - const recordid = await getSessionData(socket.id, "recordid"); + const { logLevel, recordid } = await getMultipleSessionData(socket.id, ["log_level", "recordid"]); if (LogLevelHierarchy(logLevel) >= LogLevelHierarchy(level)) { - socket.emit("log-event", { timestamp: new Date(), level, message }); - } + const logMessage = { timestamp: new Date(), level, message }; - logger.log("ws-log-event-json", level, socket.user.email, recordid, { - wsmessage: message, - json - }); + // Emit the log message via the socket + socket.emit("log-event", logMessage); - if (logEvents && Array.isArray(logEvents)) { - logEvents.push({ timestamp: new Date(), level, message }); - setSessionData(socket.id, "logEvents", logEvents).catch((err) => - console.error(`Error setting logEvents in Redis: ${err}`) - ); + // Log the JSON event via the logger + logger.log("ws-log-event-json", level, socket.user.email, recordid, { + wsmessage: message, + json + }); + + // Use the helper function to append the log event to the Redis list + await addItemToEndOfList(socket.id, "logEvents", logMessage); } } async function createXmlEvent(socket, xml, message, isError = false) { - const logLevel = await getSessionData(socket.id, "log_level"); - const recordid = await getSessionData(socket.id, "recordid"); - const logEvents = await getSessionData(socket.id, "logEvents"); + const { logLevel, recordid } = await getMultipleSessionData(socket.id, ["log_level", "recordid"]); if (LogLevelHierarchy(logLevel) >= LogLevelHierarchy("TRACE")) { - socket.emit("log-event", { + const logMessage = { timestamp: new Date(), level: isError ? "ERROR" : "TRACE", message: `${message}: ${xml}` - }); - } + }; - logger.log( - isError ? "ws-log-event-xml-error" : "ws-log-event-xml", - isError ? "ERROR" : "TRACE", - socket.user.email, - recordid, - { wsmessage: message, xml } - ); + // Emit the log message via the socket + socket.emit("log-event", logMessage); - if (logEvents && Array.isArray(logEvents)) { - logEvents.push({ - timestamp: new Date(), - level: isError ? "ERROR" : "TRACE", - message, - xml - }); + // Log the XML event via the logger + logger.log( + isError ? "ws-log-event-xml-error" : "ws-log-event-xml", + isError ? "ERROR" : "TRACE", + socket.user.email, + recordid, + { wsmessage: message, xml } + ); - await setSessionData(socket.id, "logEvents", logEvents); + // Use the helper function to append the log event to the Redis list + await addItemToEndOfList(socket.id, "logEvents", { ...logMessage, xml }); } }