From 4486858a86559b04e2989150efcdcdde2d55aa6b Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Wed, 11 Sep 2024 20:27:40 -0400 Subject: [PATCH] feature/IO-2742-redis - Checkpoint, All optimizations to prevent multiple requests to redis have been applied. Things like using lists for the Log Events, to getting and setting multiple values at the same time when given the chance. Signed-off-by: Dave Richer --- server.js | 99 +++++++++++++++++++-- server/accounting/pbs/pbs-ap-allocations.js | 19 ++-- server/accounting/pbs/pbs-job-export.js | 17 ++-- server/cdk/cdk-job-export.js | 62 +++++++------ server/web-sockets/web-socket.js | 91 +++++++++---------- 5 files changed, 186 insertions(+), 102 deletions(-) diff --git a/server.js b/server.js index f0897c3f5..d0efb579a 100644 --- a/server.js +++ b/server.js @@ -157,19 +157,102 @@ const main = async () => { await pubClient.del(`socket:${socketId}`); }; - // TODO: Remove, just a demo of hGet and hSet - // async function demoSessionData() { + // Store multiple session data in Redis + exports.setMultipleSessionData = async (socketId, keyValues) => { + // keyValues is expected to be an object { key1: value1, key2: value2, ... } + const entries = Object.entries(keyValues).map(([key, value]) => [key, JSON.stringify(value)]); + await pubClient.hSet(`socket:${socketId}`, ...entries.flat()); + }; + + // Retrieve multiple session data from Redis + exports.getMultipleSessionData = async (socketId, keys) => { + const data = await pubClient.hmGet(`socket:${socketId}`, keys); + // Redis returns an object with null values for missing keys, so we parse the non-null ones + return Object.fromEntries(keys.map((key, index) => [key, data[index] ? JSON.parse(data[index]) : null])); + }; + + exports.setMultipleFromArraySessionData = async (socketId, keyValueArray) => { + // Use Redis multi/pipeline to batch the commands + const multi = pubClient.multi(); + + keyValueArray.forEach(([key, value]) => { + multi.hSet(`socket:${socketId}`, key, JSON.stringify(value)); + }); + + await multi.exec(); // Execute all queued commands + }; + + // Helper function to add an item to the end of the Redis list + exports.addItemToEndOfList = async (socketId, key, newItem) => { + try { + await pubClient.rPush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); + } catch (error) { + console.error(`Error adding item to the end of the list for socket ${socketId}:`, error); + } + }; + + // Helper function to add an item to the beginning of the Redis list + exports.addItemToBeginningOfList = async (socketId, key, newItem) => { + try { + await pubClient.lPush(`socket:${socketId}:${key}`, JSON.stringify(newItem)); + } catch (error) { + console.error(`Error adding item to the beginning of the list for socket ${socketId}:`, error); + } + }; + + // // Demo to show how all the helper functions work + // const demoSessionData = async () => { + // const socketId = "testSocketId"; + // // // Store session data using setSessionData - // await exports.setSessionData("testSocketId", "field1", "Hello, Redis Hash!"); + // await exports.setSessionData(socketId, "field1", "Hello, Redis!"); // // // Retrieve session data using getSessionData - // const value = await exports.getSessionData("testSocketId", "field1"); + // const field1Value = await exports.getSessionData(socketId, "field1"); + // console.log("Retrieved single field value:", field1Value); // - // // Output the retrieved value - // console.log(value); + // // Store multiple session data using setMultipleSessionData + // await exports.setMultipleSessionData(socketId, { field2: "Second Value", field3: "Third Value" }); + // + // // Retrieve multiple session data using getMultipleSessionData + // const multipleFields = await exports.getMultipleSessionData(socketId, ["field2", "field3"]); + // console.log("Retrieved multiple field values:", multipleFields); + // + // // Store multiple session data using setMultipleFromArraySessionData + // await exports.setMultipleFromArraySessionData(socketId, [ + // ["field4", "Fourth Value"], + // ["field5", "Fifth Value"] + // ]); + // + // // Retrieve and log all fields + // const allFields = await exports.getMultipleSessionData(socketId, [ + // "field1", + // "field2", + // "field3", + // "field4", + // "field5" + // ]); + // console.log("Retrieved all field values:", allFields); + // + // // Add item to the end of a Redis list + // await exports.addItemToEndOfList(socketId, "logEvents", { event: "Log Event 1", timestamp: new Date() }); + // await exports.addItemToEndOfList(socketId, "logEvents", { event: "Log Event 2", timestamp: new Date() }); + // + // // Add item to the beginning of a Redis list + // await exports.addItemToBeginningOfList(socketId, "logEvents", { event: "First Log Event", timestamp: new Date() }); + // + // // Retrieve the entire list (using lRange) + // const logEvents = await pubClient.lRange(`socket:${socketId}:logEvents`, 0, -1); + // console.log("Log Events List:", logEvents.map(JSON.parse)); + // + // // Clear session data + // await exports.clearSessionData(socketId); + // console.log("Session data cleared."); + // }; + // + // if (process.env.NODE_ENV === "development") { + // await demoSessionData(); // } - // demoSessionData().catch(console.error); - // TODO: End of demo require("./server/web-sockets/web-socket"); diff --git a/server/accounting/pbs/pbs-ap-allocations.js b/server/accounting/pbs/pbs-ap-allocations.js index e2212cae5..9f3dde43e 100644 --- a/server/accounting/pbs/pbs-ap-allocations.js +++ b/server/accounting/pbs/pbs-ap-allocations.js @@ -12,7 +12,7 @@ const AxiosLib = require("axios").default; const axios = AxiosLib.create(); const { PBS_ENDPOINTS, PBS_CREDENTIALS } = require("./pbs-constants"); const { CheckForErrors } = require("./pbs-job-export"); -const { getSessionData, setSessionData } = require("../../../server"); +const { getSessionData, getMultipleSessionData, setMultipleSessionData } = require("../../../server"); const uuid = require("uuid").v4; axios.interceptors.request.use( @@ -66,8 +66,10 @@ async function PbsCalculateAllocationsAp(socket, billids) { const { bills, bodyshops } = await QueryBillData(socket, billids); const bodyshop = bodyshops[0]; - await setSessionData(socket.id, "bills", bills); - await setSessionData(socket.id, "bodyshop", bodyshop); + await setMultipleSessionData(socket.id, { + bills, + bodyshop + }); const txEnvelope = await getSessionData(socket.id, "txEnvelope"); @@ -221,12 +223,12 @@ function getCostAccount(billline, respcenters) { exports.PbsExportAp = async function (socket, { billids, txEnvelope }) { await CdkBase.createLogEvent(socket, "DEBUG", `Exporting selected AP.`); - //apAllocations has the same shape as the lines key for the accounting posting to PBS. const apAllocations = await PbsCalculateAllocationsAp(socket, billids); - await setSessionData(socket.id, "apAllocations", apAllocations); - // Store txEnvelope in session - await setSessionData(socket.id, "txEnvelope", txEnvelope); + await setMultipleSessionData(socket.id, { + apAllocations, + txEnvelope + }); for (const allocation of apAllocations) { const { billid, ...restAllocation } = allocation; @@ -257,8 +259,7 @@ exports.PbsExportAp = async function (socket, { billids, txEnvelope }) { }; async function MarkApExported(socket, billids) { - const bills = await getSessionData(socket.id, "bills"); - const bodyshop = await getSessionData(socket.id, "bodyshop"); + const { bills, bodyshop } = await getMultipleSessionData(socket.id, ["bills", "bodyshop"]); await CdkBase.createLogEvent(socket, "DEBUG", `Marking bills as exported for id ${billids}`); const client = new GraphQLClient(process.env.GRAPHQL_ENDPOINT, {}); diff --git a/server/accounting/pbs/pbs-job-export.js b/server/accounting/pbs/pbs-job-export.js index 5ec38982f..01f3060d6 100644 --- a/server/accounting/pbs/pbs-job-export.js +++ b/server/accounting/pbs/pbs-job-export.js @@ -12,7 +12,7 @@ const CalculateAllocations = require("../../cdk/cdk-calculate-allocations").defa const CdkBase = require("../../web-sockets/web-socket"); const moment = require("moment-timezone"); const Dinero = require("dinero.js"); -const { setSessionData, getSessionData } = require("../../../server"); +const { setSessionData, getSessionData, getMultipleSessionData, setMultipleSessionData } = require("../../../server"); const InstanceManager = require("../../utils/instanceMgr").default; const axios = AxiosLib.create(); @@ -60,9 +60,10 @@ axios.interceptors.response.use( exports.default = async function (socket, { txEnvelope, jobid }) { try { - await setSessionData(socket.id, "logEvents", []); - await setSessionData(socket.id, "recordid", jobid); - await setSessionData(socket.id, "txEnvelope", txEnvelope); + await setMultipleSessionData(socket.id, { + recordid: jobid, + txEnvelope + }); await CdkBase.createLogEvent(socket, "DEBUG", `Received Job export request for id ${jobid}`); @@ -536,8 +537,7 @@ async function UpsertVehicleData(socket, ownerRef) { async function InsertAccountPostingData(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, txEnvelope } = await getMultipleSessionData(socket.id, ["JobData", "txEnvelope"]); const allocations = await CalculateAllocations(socket, JobData.id); @@ -650,8 +650,9 @@ async function InsertAccountPostingData(socket) { async function MarkJobExported(socket, jobid) { await CdkBase.createLogEvent(socket, "DEBUG", `Marking job as exported for id ${jobid}`); const client = new GraphQLClient(process.env.GRAPHQL_ENDPOINT, {}); - const JobData = await getSessionData(socket.id, "JobData"); - const transWips = await getSessionData(socket.id, "transWips"); + + const { JobData, transWips } = await getMultipleSessionData(socket.id, ["JobData", "transWips"]); + return await client .setHeaders({ Authorization: `Bearer ${socket.handshake.auth.token}` }) .request(queries.MARK_JOB_EXPORTED, { diff --git a/server/cdk/cdk-job-export.js b/server/cdk/cdk-job-export.js index 592debf4c..b9d58cfda 100644 --- a/server/cdk/cdk-job-export.js +++ b/server/cdk/cdk-job-export.js @@ -12,14 +12,15 @@ const CalcualteAllocations = require("./cdk-calculate-allocations").default; const InstanceMgr = require("../utils/instanceMgr").default; const moment = require("moment-timezone"); -const { setSessionData, getSessionData } = require("../../server"); +const { setSessionData, getSessionData, getMultipleSessionData, setMultipleSessionData } = require("../../server"); const replaceSpecialRegex = /[^a-zA-Z0-9 .,\n #]+/g; exports.default = async function (socket, { txEnvelope, jobid }) { - await setSessionData(socket.id, "logEvents", []); - await setSessionData(socket.id, "recordid", jobid); - await setSessionData(socket.id, "txEnvelope", txEnvelope); + await setMultipleSessionData(socket.id, { + recordid: jobid, + txEnvelope + }); try { await CdkBase.createLogEvent(socket, "DEBUG", `Received Job export request for id ${jobid}`); @@ -473,10 +474,12 @@ async function InsertDmsCustomer(socket, newCustomerNumber) { async function InsertDmsVehicle(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); - const DMSVid = await getSessionData(socket.id, "DMSVid"); - const DMSCust = await getSessionData(socket.id, "DMSCust"); + const { JobData, txEnvelope, DMSVid, DMSCust } = await getMultipleSessionData(socket.id, [ + "JobData", + "txEnvelope", + "DMSVid", + "DMSCust" + ]); const soapClientVehicleInsertUpdate = await soap.createClientAsync(CdkWsdl.VehicleInsertUpdate); @@ -554,11 +557,13 @@ async function InsertDmsVehicle(socket) { async function UpdateDmsVehicle(socket) { try { const soapClientVehicleInsertUpdate = await soap.createClientAsync(CdkWsdl.VehicleInsertUpdate); - const JobData = await getSessionData(socket.id, "JobData"); - const DMSVeh = await getSessionData(socket.id, "DMSVeh"); - const DMSCust = await getSessionData(socket.id, "DMSCust"); - const selectedCustomerId = await getSessionData(socket.id, "selectedCustomerId"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, DMSVeh, DMSCust, selectedCustomerId, txEnvelope } = await getMultipleSessionData(socket.id, [ + "JobData", + "DMSVeh", + "DMSCust", + "selectedCustomerId", + "txEnvelope" + ]); let ids = []; @@ -660,9 +665,11 @@ async function UpdateDmsVehicle(socket) { async function InsertServiceVehicleHistory(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSVid = await getSessionData(socket.id, "DMSVid"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, DMSVid, txEnvelope } = await getMultipleSessionData(socket.id, [ + "JobData", + "DMSVid", + "txEnvelope" + ]); const soapClientServiceHistoryInsert = await soap.createClientAsync(CdkWsdl.ServiceHistoryInsert); @@ -713,8 +720,7 @@ async function InsertServiceVehicleHistory(socket) { async function InsertDmsStartWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, txEnvelope } = await getMultipleSessionData(socket.id, ["JobData", "txEnvelope"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -799,9 +805,11 @@ async function InsertDmsBatchWip(socket) { } async function GenerateTransWips(socket) { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); - const txEnvelope = await getSessionData(socket.id, "txEnvelope"); + const { JobData, DMSTransHeader, txEnvelope } = await getMultipleSessionData(socket.id, [ + "JobData", + "DMSTransHeader", + "txEnvelope" + ]); const allocations = await CalcualteAllocations(socket, JobData.id); const wips = []; @@ -921,8 +929,7 @@ async function GenerateTransWips(socket) { async function PostDmsBatchWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); + const { JobData, DMSTransHeader } = await getMultipleSessionData(socket.id, ["JobData", "DMSTransHeader"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -959,8 +966,7 @@ async function PostDmsBatchWip(socket) { async function QueryDmsErrWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); + const { JobData, DMSTransHeader } = await getMultipleSessionData(socket.id, ["JobData", "DMSTransHeader"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -993,8 +999,7 @@ async function QueryDmsErrWip(socket) { async function DeleteDmsWip(socket) { try { - const JobData = await getSessionData(socket.id, "JobData"); - const DMSTransHeader = await getSessionData(socket.id, "DMSTransHeader"); + const { JobData, DMSTransHeader } = await getMultipleSessionData(socket.id, ["JobData", "DMSTransHeader"]); const soapClientAccountingGLInsertUpdate = await soap.createClientAsync(CdkWsdl.AccountingGLInsertUpdate); @@ -1032,8 +1037,7 @@ async function DeleteDmsWip(socket) { async function MarkJobExported(socket, jobid) { await CdkBase.createLogEvent(socket, "DEBUG", `Marking job as exported for id ${jobid}`); - const JobData = await getSessionData(socket.id, "JobData"); - const transWips = (await getSessionData(socket.id, "transWips")) || []; + const { JobData, transWips = [] } = await getMultipleSessionData(socket.id, ["JobData", "transWips"]); const client = new GraphQLClient(process.env.GRAPHQL_ENDPOINT, {}); diff --git a/server/web-sockets/web-socket.js b/server/web-sockets/web-socket.js index c2dd7c073..708398829 100644 --- a/server/web-sockets/web-socket.js +++ b/server/web-sockets/web-socket.js @@ -3,7 +3,8 @@ require("dotenv").config({ path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); -const { io, setSessionData, clearSessionData, getSessionData } = require("../../server"); +const { io, setSessionData, clearSessionData, getMultipleSessionData, addItemToEndOfList } = require("../../server"); + const { admin } = require("../firebase/firebase-handler"); const logger = require("../utils/logger"); const { default: CdkJobExport, CdkSelectedCustomer } = require("../cdk/cdk-job-export"); @@ -117,7 +118,10 @@ function registerPbsApEvents(socket) { socket.on("pbs-export-ap", async ({ billids, txEnvelope }) => { await setSessionData(socket.id, "pbs_txEnvelope", txEnvelope); - PbsExportAp(socket, { billids, txEnvelope }).catch((err) => console.error(`Error in pbs-export-ap: ${err}`)); + PbsExportAp(socket, { + billids, + txEnvelope + }).catch((err) => console.error(`Error in pbs-export-ap: ${err}`)); }); } @@ -149,76 +153,67 @@ function registerDmsClearSessionEvent(socket) { // Logging helper functions async function createLogEvent(socket, level, message) { - const logLevel = await getSessionData(socket.id, "log_level"); - const recordid = await getSessionData(socket.id, "recordid"); - const logEvents = await getSessionData(socket.id, "logEvents"); + const { logLevel, recordid } = await getMultipleSessionData(socket.id, ["log_level", "recordid"]); if (LogLevelHierarchy(logLevel) >= LogLevelHierarchy(level)) { - console.log(`[WS LOG EVENT] ${level} - ${new Date()} - ${socket.user.email} - ${socket.id} - ${message}`); - socket.emit("log-event", { timestamp: new Date(), level, message }); + const logMessage = { timestamp: new Date(), level, message }; + + // Log the message to the console and emit it via the socket + console.log(`[WS LOG EVENT] ${level} - ${logMessage.timestamp} - ${socket.user.email} - ${socket.id} - ${message}`); + socket.emit("log-event", logMessage); + + // Log the event via the logger logger.log("ws-log-event", level, socket.user.email, recordid, { wsmessage: message }); - if (logEvents && Array.isArray(logEvents)) { - logEvents.push({ timestamp: new Date(), level, message }); - setSessionData(socket.id, "logEvents", logEvents).catch((err) => - console.error(`Error setting logEvents in Redis: ${err}`) - ); - } + // Add the log message to the Redis list using the helper function + await addItemToEndOfList(socket.id, "logEvents", logMessage); } } async function createJsonEvent(socket, level, message, json) { - const logEvents = await getSessionData(socket.id, "logEvents"); - const logLevel = await getSessionData(socket.id, "log_level"); - const recordid = await getSessionData(socket.id, "recordid"); + const { logLevel, recordid } = await getMultipleSessionData(socket.id, ["log_level", "recordid"]); if (LogLevelHierarchy(logLevel) >= LogLevelHierarchy(level)) { - socket.emit("log-event", { timestamp: new Date(), level, message }); - } + const logMessage = { timestamp: new Date(), level, message }; - logger.log("ws-log-event-json", level, socket.user.email, recordid, { - wsmessage: message, - json - }); + // Emit the log message via the socket + socket.emit("log-event", logMessage); - if (logEvents && Array.isArray(logEvents)) { - logEvents.push({ timestamp: new Date(), level, message }); - setSessionData(socket.id, "logEvents", logEvents).catch((err) => - console.error(`Error setting logEvents in Redis: ${err}`) - ); + // Log the JSON event via the logger + logger.log("ws-log-event-json", level, socket.user.email, recordid, { + wsmessage: message, + json + }); + + // Use the helper function to append the log event to the Redis list + await addItemToEndOfList(socket.id, "logEvents", logMessage); } } async function createXmlEvent(socket, xml, message, isError = false) { - const logLevel = await getSessionData(socket.id, "log_level"); - const recordid = await getSessionData(socket.id, "recordid"); - const logEvents = await getSessionData(socket.id, "logEvents"); + const { logLevel, recordid } = await getMultipleSessionData(socket.id, ["log_level", "recordid"]); if (LogLevelHierarchy(logLevel) >= LogLevelHierarchy("TRACE")) { - socket.emit("log-event", { + const logMessage = { timestamp: new Date(), level: isError ? "ERROR" : "TRACE", message: `${message}: ${xml}` - }); - } + }; - logger.log( - isError ? "ws-log-event-xml-error" : "ws-log-event-xml", - isError ? "ERROR" : "TRACE", - socket.user.email, - recordid, - { wsmessage: message, xml } - ); + // Emit the log message via the socket + socket.emit("log-event", logMessage); - if (logEvents && Array.isArray(logEvents)) { - logEvents.push({ - timestamp: new Date(), - level: isError ? "ERROR" : "TRACE", - message, - xml - }); + // Log the XML event via the logger + logger.log( + isError ? "ws-log-event-xml-error" : "ws-log-event-xml", + isError ? "ERROR" : "TRACE", + socket.user.email, + recordid, + { wsmessage: message, xml } + ); - await setSessionData(socket.id, "logEvents", logEvents); + // Use the helper function to append the log event to the Redis list + await addItemToEndOfList(socket.id, "logEvents", { ...logMessage, xml }); } }