// Load environment variables THIS MUST BE AT THE TOP const path = require("path"); require("dotenv").config({ path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); // DATADOG TRACE Implemention (Uncomment to enable tracing, requires dd-trace package) // if (process.env.NODE_ENV) { // require("dd-trace").init({ // profiling: true, // env: process.env.NODE_ENV, // service: "bodyshop-api" // }); // } const cors = require("cors"); const http = require("http"); const Redis = require("ioredis"); const express = require("express"); const compression = require("compression"); const cookieParser = require("cookie-parser"); const { Server } = require("socket.io"); const { createAdapter } = require("@socket.io/redis-adapter"); const { instrument } = require("@socket.io/admin-ui"); const { isString, isEmpty } = require("lodash"); const logger = require("./server/utils/logger"); const { applyRedisHelpers } = require("./server/utils/redisHelpers"); const { applyIOHelpers } = require("./server/utils/ioHelpers"); const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents"); const { ElastiCacheClient, DescribeCacheClustersCommand, DescribeReplicationGroupsCommand } = require("@aws-sdk/client-elasticache"); const { InstanceRegion } = require("./server/utils/instanceMgr"); const { registerCleanupTask, initializeCleanupManager } = require("./server/utils/cleanupManager"); const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); const { loadAppQueue } = require("./server/notifications/queues/appQueue"); const { SetLegacyWebsocketHandlers } = require("./server/web-sockets/web-socket"); const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue"); const { loadChatterApiQueue } = require("./server/data/queues/chatterApiQueue"); const { processChatterApiJob } = require("./server/data/chatter-api"); const CLUSTER_RETRY_BASE_DELAY = 100; const CLUSTER_RETRY_MAX_DELAY = 5000; const CLUSTER_RETRY_JITTER = 100; /** * CORS Origin for Socket.IO * @type {string[][]} */ const SOCKETIO_CORS_ORIGIN = [ "https://test.imex.online", "https://www.test.imex.online", "http://localhost:3000", "https://localhost:3000", "https://imex.online", "https://www.imex.online", "https://romeonline.io", "https://www.romeonline.io", "https://test.romeonline.io", "https://www.test.romeonline.io", "https://beta.romeonline.io", "https://www.beta.romeonline.io", "https://beta.test.imex.online", "https://www.beta.test.imex.online", "https://beta.imex.online", "https://www.beta.imex.online", "https://www.test.promanager.web-est.com", "https://test.promanager.web-est.com", "https://www.promanager.web-est.com", "https://promanager.web-est.com", "https://old.imex.online", "https://www.old.imex.online", "https://wsadmin.imex.online", "https://www.wsadmin.imex.online", "https://devproe.web-est.com", "https://stagingproe.web-est.com", "https://pm.staging.web-est.com", "https://pm.web-est.com" ]; const SOCKETIO_CORS_ORIGIN_DEV = ["http://localhost:3333", "https://localhost:3333"]; /** * Middleware for Express app * @param app */ const applyMiddleware = ({ app }) => { app.use(compression()); app.use(cookieParser()); app.use(express.json({ limit: "50mb" })); app.use(express.urlencoded({ limit: "50mb", extended: true })); app.use(cors({ credentials: true, exposedHeaders: ["set-cookie"] })); // Helper middleware app.use((req, res, next) => { req.logger = logger; next(); }); }; /** * Route groupings for Express app * @param app */ const applyRoutes = ({ app }) => { app.use("/", require("./server/routes/miscellaneousRoutes")); app.use("/notifications", require("./server/routes/notificationsRoutes")); app.use("/render", require("./server/routes/renderRoutes")); app.use("/mixdata", require("./server/routes/mixDataRoutes")); app.use("/accounting", require("./server/routes/accountingRoutes")); app.use("/qbo", require("./server/routes/qboRoutes")); app.use("/media", require("./server/routes/mediaRoutes")); app.use("/sms", require("./server/routes/smsRoutes")); app.use("/job", require("./server/routes/jobRoutes")); app.use("/scheduling", require("./server/routes/schedulingRoutes")); app.use("/utils", require("./server/routes/utilRoutes")); app.use("/data", require("./server/routes/dataRoutes")); app.use("/adm", require("./server/routes/adminRoutes")); app.use("/tech", require("./server/routes/techRoutes")); app.use("/intellipay", require("./server/routes/intellipayRoutes")); app.use("/cdk", require("./server/routes/cdkRoutes")); app.use("/csi", require("./server/routes/csiRoutes")); app.use("/payroll", require("./server/routes/payrollRoutes")); app.use("/sso", require("./server/routes/ssoRoutes")); app.use("/integrations", require("./server/routes/intergrationRoutes")); app.use("/ai", require("./server/routes/aiRoutes")); app.use("/chatter", require("./server/routes/chatterRoutes")); // Default route for forbidden access app.get("/", (req, res) => { res.status(200).send("Access Forbidden."); }); }; /** * Fetch Redis nodes from AWS ElastiCache * @returns {Promise} */ const getRedisNodesFromAWS = async () => { const client = new ElastiCacheClient({ region: InstanceRegion() }); try { const describeReplicationGroupCommand = new DescribeReplicationGroupsCommand({ ReplicationGroupId: process.env.REDIS_CLUSTER_ID }); const describeReplicationGroupResponse = await client.send(describeReplicationGroupCommand); //TODO: add checking to make sure there's only 1. const cacheClusterIds = describeReplicationGroupResponse.ReplicationGroups[0].MemberClusters; // Ensure cacheClusters exists and is an array if (!cacheClusterIds || !Array.isArray(cacheClusterIds) || cacheClusterIds.length === 0) { logger.log(`No cache clusters found for cluster id ${process.env.REDIS_CLUSTER_ID}`, "ERROR", "redis", "api"); return []; } const nodeEndpointAddresses = []; for (const cluster of cacheClusterIds) { const params = { CacheClusterId: cluster, ShowCacheNodeInfo: true }; const command = new DescribeCacheClustersCommand(params); const response = await client.send(command); if (response.CacheClusters && Array.isArray(response.CacheClusters)) { // Map nodes to address strings //TODO: What happens if we have more shards? const nodeAddress = `${response.CacheClusters[0].CacheNodes[0].Endpoint.Address}:${response.CacheClusters[0].CacheNodes[0].Endpoint.Port}`; // Debug log node addresses logger.log(`Cluster node addresses: ${nodeAddress}`, "DEBUG", "redis", "api"); // Return only those addresses that start with the current cluster id nodeEndpointAddresses.push(nodeAddress); } } return nodeEndpointAddresses; // Process each cluster } catch (err) { logger.log(`Error fetching Redis nodes from AWS:`, "ERROR", "redis", "api", { message: err?.message, stack: err?.stack }); throw err; } }; /** * Connect to Redis Cluster * @returns {Promise} */ const connectToRedisCluster = async () => { let redisServers; if (isString(process.env?.REDIS_CLUSTER_ID) && !isEmpty(process.env?.REDIS_CLUSTER_ID)) { // Fetch Redis nodes from AWS if AWS environment variables are present redisServers = await getRedisNodesFromAWS(); } else { // Use the Dockerized Redis cluster in development if (isEmpty(process.env?.REDIS_URL) || !isString(process.env?.REDIS_URL)) { logger.log(`No or Malformed REDIS_URL present.`, "ERROR", "redis", "api"); process.exit(1); } try { redisServers = JSON.parse(process.env.REDIS_URL); } catch (error) { logger.log(`Failed to parse REDIS_URL: ${error.message}. Exiting...`, "ERROR", "redis", "api", { message: error?.message, stack: error?.stack }); process.exit(1); } } const clusterRetryStrategy = (times) => { const delay = Math.min(CLUSTER_RETRY_BASE_DELAY + times * 50, CLUSTER_RETRY_MAX_DELAY) + Math.random() * CLUSTER_RETRY_JITTER; // Only log every 5th retry or after 10 attempts to reduce noise during startup if (times % 5 === 0 || times > 10) { logger.log( `Redis cluster not yet ready. Retry attempt ${times}, waiting ${delay.toFixed(2)}ms`, "WARN", "redis", "api" ); } return delay; }; const redisCluster = new Redis.Cluster(redisServers, { clusterRetryStrategy, enableAutoPipelining: true, enableReadyCheck: true, redisOptions: { // connectTimeout: 10000, // Timeout for connecting in ms // idleTimeoutMillis: 30000, // Close idle connections after 30s // maxRetriesPerRequest: 5 // Retry a maximum of 5 times per request } }); return new Promise((resolve, reject) => { redisCluster.on("ready", () => { logger.log(`Redis cluster connection established.`, "INFO", "redis", "api"); if (process.env.NODE_ENV === "development" && process.env?.CLEAR_REDIS_ON_START === "true") { logger.log("[Development] Flushing Redis Cluster on Service start...", "INFO", "redis", "api"); const master = redisCluster.nodes("master"); Promise.all(master.map((node) => node.flushall())).then(() => { resolve(redisCluster); }); } else { resolve(redisCluster); } }); redisCluster.on("error", (err) => { logger.log(`Redis cluster connection failed:`, "ERROR", "redis", "api", { message: err?.message, stack: err?.stack }); reject(err); }); redisCluster.on("+node", (node) => { logger.log(`Redis cluster node added`, "INFO", "redis", "api", { host: node.options.host, port: node.options.port }); }); redisCluster.on("-node", (node) => { logger.log(`Redis cluster node removed`, "WARN", "redis", "api", { host: node.options.host, port: node.options.port }); }); redisCluster.on("node error", (error, node) => { console.dir(error); logger.log(`Redis node error`, "ERROR", "redis", "api", { host: node?.options?.host, port: node?.options?.port, message: error.message }); }); }); }; /** * Apply Redis to the server * @param server * @param app */ const applySocketIO = async ({ server, app }) => { const redisCluster = await connectToRedisCluster(); // Handle errors redisCluster.on("error", () => { logger.log(`Redis ERROR`, "ERROR", "redis", "api"); }); const pubClient = redisCluster; const subClient = pubClient.duplicate(); pubClient.on("error", (err) => logger.log(`Redis pubClient error: ${err}`, "ERROR", "redis", "api", { message: err?.message, stack: err?.stack }) ); subClient.on("error", (err) => logger.log(`Redis subClient error: ${err}`, "ERROR", "redis", "api", { message: err?.message, stack: err?.stack }) ); // Register Redis cleanup registerCleanupTask(async () => { logger.log("Closing Redis connections...", "INFO", "redis", "api"); await Promise.all([pubClient.disconnect(), subClient.disconnect()]); logger.log("Redis connections closed.", "INFO", "redis", "api"); }); // IO Redis const ioRedis = new Server(server, { path: "/wss", adapter: createAdapter(pubClient, subClient), cors: { origin: process.env?.NODE_ENV === "development" ? [...SOCKETIO_CORS_ORIGIN, ...SOCKETIO_CORS_ORIGIN_DEV] : SOCKETIO_CORS_ORIGIN, methods: ["GET", "POST"], credentials: true, exposedHeaders: ["set-cookie"] } }); if (isString(process.env.REDIS_ADMIN_PASS) && !isEmpty(process.env.REDIS_ADMIN_PASS)) { logger.log(`Initializing Redis Admin UI....`, "INFO", "redis", "api"); instrument(ioRedis, { auth: { type: "basic", username: "admin", password: process.env.REDIS_ADMIN_PASS }, mode: process.env.REDIS_ADMIN_MODE || "development" }); } const io = new Server(server, { path: "/ws", cors: { origin: SOCKETIO_CORS_ORIGIN, methods: ["GET", "POST"], credentials: true, exposedHeaders: ["set-cookie"] } }); // Legacy Socket Events SetLegacyWebsocketHandlers(io); const api = { pubClient, io, ioRedis, redisCluster }; app.use((req, res, next) => { Object.assign(req, api); next(); }); Object.assign(module.exports, api); return api; }; /** * Load Queues for Email and App * @param {Object} options - Queue configuration options * @param {Redis.Cluster} options.pubClient - Redis client for publishing * @param {Object} options.logger - Logger instance * @param {Object} options.redisHelpers - Redis helper functions * @param {Server} options.ioRedis - Socket.IO server instance * @returns {Promise} */ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const queueSettings = { pubClient, logger, redisHelpers, ioRedis }; // Load chatterApi queue with processJob function and redis helpers const chatterApiQueue = await loadChatterApiQueue({ pubClient, logger, processJob: processChatterApiJob, getChatterToken: redisHelpers.getChatterToken, setChatterToken: redisHelpers.setChatterToken }); // Assuming loadEmailQueue and loadAppQueue return Promises const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([ loadEmailQueue(queueSettings), loadAppQueue(queueSettings), loadFcmQueue(queueSettings) ]); // Add error listeners or other setup for queues if needed notificationsEmailsQueue.on("error", (error) => { logger.log(`Error in notificationsEmailsQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); notificationsAppQueue.on("error", (error) => { logger.log(`Error in notificationsAppQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); notificationsFcmQueue.on("error", (error) => { logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); chatterApiQueue.on("error", (error) => { logger.log(`Error in chatterApiQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); // Initialize bill-ocr with Redis client const { initializeBillOcr, startSQSPolling } = require("./server/ai/bill-ocr/bill-ocr"); initializeBillOcr(pubClient); // Start SQS polling for Textract notifications startSQSPolling(); }; /** * Main function to start the server * @returns {Promise} */ const main = async () => { const app = express(); const port = process.env.PORT || 4000; const server = http.createServer(app); // Initialize cleanup manager with signal handlers initializeCleanupManager(); const { pubClient, ioRedis } = await applySocketIO({ server, app }); const redisHelpers = applyRedisHelpers({ pubClient, app, logger }); const ioHelpers = applyIOHelpers({ app, redisHelpers, ioRedis, logger }); // Initialize Queues await loadQueues({ pubClient: pubClient, logger, redisHelpers, ioRedis }); applyMiddleware({ app }); applyRoutes({ app }); redisSocketEvents({ io: ioRedis, redisHelpers, ioHelpers, logger }); try { await server.listen(port); logger.log(`Server started on port ${port}`, "INFO", "api"); } catch (error) { logger.log(`Server failed to start on port ${port}`, "ERROR", "api", null, { error: error.message }); } }; // Start server main().catch((error) => { logger.log(`Main-API-Error: Something was not caught in the application.`, "error", "api", null, { error: error.message, errorjson: JSON.stringify(error), stack: error.stack }); // Note: If we want the app to crash on all uncaught async operations, we would // need to put a `process.exit(1);` here });