418 lines
14 KiB
JavaScript
418 lines
14 KiB
JavaScript
// Load environment variables THIS MUST BE AT THE TOP
|
|
const path = require("path");
|
|
require("dotenv").config({
|
|
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
|
|
});
|
|
|
|
// DATADOG TRACE Implemention (Uncomment to enable tracing, requires dd-trace package)
|
|
// if (process.env.NODE_ENV) {
|
|
// require("dd-trace").init({
|
|
// profiling: true,
|
|
// env: process.env.NODE_ENV,
|
|
// service: "bodyshop-api"
|
|
// });
|
|
// }
|
|
|
|
const cors = require("cors");
|
|
const http = require("http");
|
|
const Redis = require("ioredis");
|
|
const express = require("express");
|
|
const compression = require("compression");
|
|
const cookieParser = require("cookie-parser");
|
|
const { Server } = require("socket.io");
|
|
const { createAdapter } = require("@socket.io/redis-adapter");
|
|
const { instrument } = require("@socket.io/admin-ui");
|
|
const { isString, isEmpty } = require("lodash");
|
|
|
|
const logger = require("./server/utils/logger");
|
|
const { applyRedisHelpers } = require("./server/utils/redisHelpers");
|
|
const { applyIOHelpers } = require("./server/utils/ioHelpers");
|
|
const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents");
|
|
const {
|
|
ElastiCacheClient,
|
|
DescribeCacheClustersCommand,
|
|
DescribeReplicationGroupsCommand
|
|
} = require("@aws-sdk/client-elasticache");
|
|
const { InstanceRegion } = require("./server/utils/instanceMgr");
|
|
const { registerCleanupTask, initializeCleanupManager } = require("./server/utils/cleanupManager");
|
|
|
|
const { loadEmailQueue } = require("./server/notifications/queues/emailQueue");
|
|
const { loadAppQueue } = require("./server/notifications/queues/appQueue");
|
|
|
|
const CLUSTER_RETRY_BASE_DELAY = 100;
|
|
const CLUSTER_RETRY_MAX_DELAY = 5000;
|
|
const CLUSTER_RETRY_JITTER = 100;
|
|
|
|
/**
|
|
* CORS Origin for Socket.IO
|
|
* @type {string[][]}
|
|
*/
|
|
const SOCKETIO_CORS_ORIGIN = [
|
|
"https://test.imex.online",
|
|
"https://www.test.imex.online",
|
|
"http://localhost:3000",
|
|
"https://localhost:3000",
|
|
"https://imex.online",
|
|
"https://www.imex.online",
|
|
"https://romeonline.io",
|
|
"https://www.romeonline.io",
|
|
"https://test.romeonline.io",
|
|
"https://www.test.romeonline.io",
|
|
"https://beta.romeonline.io",
|
|
"https://www.beta.romeonline.io",
|
|
"https://beta.test.imex.online",
|
|
"https://www.beta.test.imex.online",
|
|
"https://beta.imex.online",
|
|
"https://www.beta.imex.online",
|
|
"https://www.test.promanager.web-est.com",
|
|
"https://test.promanager.web-est.com",
|
|
"https://www.promanager.web-est.com",
|
|
"https://promanager.web-est.com",
|
|
"https://old.imex.online",
|
|
"https://www.old.imex.online",
|
|
"https://wsadmin.imex.online",
|
|
"https://www.wsadmin.imex.online",
|
|
"https://devproe.web-est.com",
|
|
"https://stagingproe.web-est.com",
|
|
"https://pm.staging.web-est.com",
|
|
"https://pm.web-est.com"
|
|
];
|
|
|
|
const SOCKETIO_CORS_ORIGIN_DEV = ["http://localhost:3333", "https://localhost:3333"];
|
|
|
|
/**
|
|
* Middleware for Express app
|
|
* @param app
|
|
*/
|
|
const applyMiddleware = ({ app }) => {
|
|
app.use(compression());
|
|
app.use(cookieParser());
|
|
app.use(express.json({ limit: "50mb" }));
|
|
app.use(express.urlencoded({ limit: "50mb", extended: true }));
|
|
app.use(cors({ credentials: true, exposedHeaders: ["set-cookie"] }));
|
|
|
|
// Helper middleware
|
|
app.use((req, res, next) => {
|
|
req.logger = logger;
|
|
next();
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Route groupings for Express app
|
|
* @param app
|
|
*/
|
|
const applyRoutes = ({ app }) => {
|
|
app.use("/", require("./server/routes/miscellaneousRoutes"));
|
|
app.use("/notifications", require("./server/routes/notificationsRoutes"));
|
|
app.use("/render", require("./server/routes/renderRoutes"));
|
|
app.use("/mixdata", require("./server/routes/mixDataRoutes"));
|
|
app.use("/accounting", require("./server/routes/accountingRoutes"));
|
|
app.use("/qbo", require("./server/routes/qboRoutes"));
|
|
app.use("/media", require("./server/routes/mediaRoutes"));
|
|
app.use("/sms", require("./server/routes/smsRoutes"));
|
|
app.use("/job", require("./server/routes/jobRoutes"));
|
|
app.use("/scheduling", require("./server/routes/schedulingRoutes"));
|
|
app.use("/utils", require("./server/routes/utilRoutes"));
|
|
app.use("/data", require("./server/routes/dataRoutes"));
|
|
app.use("/adm", require("./server/routes/adminRoutes"));
|
|
app.use("/tech", require("./server/routes/techRoutes"));
|
|
app.use("/intellipay", require("./server/routes/intellipayRoutes"));
|
|
app.use("/cdk", require("./server/routes/cdkRoutes"));
|
|
app.use("/csi", require("./server/routes/csiRoutes"));
|
|
app.use("/payroll", require("./server/routes/payrollRoutes"));
|
|
app.use("/sso", require("./server/routes/ssoRoutes"));
|
|
app.use("/integrations", require("./server/routes/intergrationRoutes"));
|
|
|
|
// Default route for forbidden access
|
|
app.get("/", (req, res) => {
|
|
res.status(200).send("Access Forbidden.");
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Fetch Redis nodes from AWS ElastiCache
|
|
* @returns {Promise<string[]>}
|
|
*/
|
|
const getRedisNodesFromAWS = async () => {
|
|
const client = new ElastiCacheClient({ region: InstanceRegion() });
|
|
|
|
try {
|
|
const describeReplicationGroupCommand = new DescribeReplicationGroupsCommand({
|
|
ReplicationGroupId: process.env.REDIS_CLUSTER_ID
|
|
});
|
|
const describeReplicationGroupResponse = await client.send(describeReplicationGroupCommand);
|
|
|
|
//TODO: add checking to make sure there's only 1.
|
|
const cacheClusterIds = describeReplicationGroupResponse.ReplicationGroups[0].MemberClusters;
|
|
|
|
// Ensure cacheClusters exists and is an array
|
|
if (!cacheClusterIds || !Array.isArray(cacheClusterIds) || cacheClusterIds.length === 0) {
|
|
logger.log(`No cache clusters found for cluster id ${process.env.REDIS_CLUSTER_ID}`, "ERROR", "redis", "api");
|
|
return [];
|
|
}
|
|
|
|
const nodeEndpointAddresses = [];
|
|
|
|
for (const cluster of cacheClusterIds) {
|
|
const params = { CacheClusterId: cluster, ShowCacheNodeInfo: true };
|
|
const command = new DescribeCacheClustersCommand(params);
|
|
const response = await client.send(command);
|
|
|
|
if (response.CacheClusters && Array.isArray(response.CacheClusters)) {
|
|
// Map nodes to address strings
|
|
//TODO: What happens if we have more shards?
|
|
const nodeAddress = `${response.CacheClusters[0].CacheNodes[0].Endpoint.Address}:${response.CacheClusters[0].CacheNodes[0].Endpoint.Port}`;
|
|
// Debug log node addresses
|
|
logger.log(`Cluster node addresses: ${nodeAddress}`, "DEBUG", "redis", "api");
|
|
// Return only those addresses that start with the current cluster id
|
|
nodeEndpointAddresses.push(nodeAddress);
|
|
}
|
|
}
|
|
|
|
return nodeEndpointAddresses;
|
|
// Process each cluster
|
|
} catch (err) {
|
|
logger.log(`Error fetching Redis nodes from AWS:`, "ERROR", "redis", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
});
|
|
throw err;
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Connect to Redis Cluster
|
|
* @returns {Promise<unknown>}
|
|
*/
|
|
const connectToRedisCluster = async () => {
|
|
let redisServers;
|
|
|
|
if (isString(process.env?.REDIS_CLUSTER_ID) && !isEmpty(process.env?.REDIS_CLUSTER_ID)) {
|
|
// Fetch Redis nodes from AWS if AWS environment variables are present
|
|
redisServers = await getRedisNodesFromAWS();
|
|
} else {
|
|
// Use the Dockerized Redis cluster in development
|
|
if (isEmpty(process.env?.REDIS_URL) || !isString(process.env?.REDIS_URL)) {
|
|
logger.log(`No or Malformed REDIS_URL present.`, "ERROR", "redis", "api");
|
|
process.exit(1);
|
|
}
|
|
try {
|
|
redisServers = JSON.parse(process.env.REDIS_URL);
|
|
} catch (error) {
|
|
logger.log(`Failed to parse REDIS_URL: ${error.message}. Exiting...`, "ERROR", "redis", "api", {
|
|
message: error?.message,
|
|
stack: error?.stack
|
|
});
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
const clusterRetryStrategy = (times) => {
|
|
const delay =
|
|
Math.min(CLUSTER_RETRY_BASE_DELAY + times * 50, CLUSTER_RETRY_MAX_DELAY) + Math.random() * CLUSTER_RETRY_JITTER;
|
|
logger.log(`Redis cluster not yet ready. Retrying in ${delay.toFixed(2)}ms`, "WARN", "redis", "api");
|
|
return delay;
|
|
};
|
|
|
|
const redisCluster = new Redis.Cluster(redisServers, {
|
|
clusterRetryStrategy,
|
|
enableAutoPipelining: true,
|
|
enableReadyCheck: true,
|
|
redisOptions: {
|
|
// connectTimeout: 10000, // Timeout for connecting in ms
|
|
// idleTimeoutMillis: 30000, // Close idle connections after 30s
|
|
// maxRetriesPerRequest: 5 // Retry a maximum of 5 times per request
|
|
}
|
|
});
|
|
|
|
return new Promise((resolve, reject) => {
|
|
redisCluster.on("ready", () => {
|
|
logger.log(`Redis cluster connection established.`, "INFO", "redis", "api");
|
|
if (process.env.NODE_ENV === "development" && process.env?.CLEAR_REDIS_ON_START === "true") {
|
|
logger.log("[Development] Flushing Redis Cluster on Service start...", "INFO", "redis", "api");
|
|
const master = redisCluster.nodes("master");
|
|
Promise.all(master.map((node) => node.flushall())).then(() => {
|
|
resolve(redisCluster);
|
|
});
|
|
} else {
|
|
resolve(redisCluster);
|
|
}
|
|
});
|
|
|
|
redisCluster.on("error", (err) => {
|
|
logger.log(`Redis cluster connection failed:`, "ERROR", "redis", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
});
|
|
reject(err);
|
|
});
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Apply Redis to the server
|
|
* @param server
|
|
* @param app
|
|
*/
|
|
const applySocketIO = async ({ server, app }) => {
|
|
const redisCluster = await connectToRedisCluster();
|
|
|
|
// Handle errors
|
|
redisCluster.on("error", () => {
|
|
logger.log(`Redis ERROR`, "ERROR", "redis", "api");
|
|
});
|
|
|
|
const pubClient = redisCluster;
|
|
const subClient = pubClient.duplicate();
|
|
|
|
pubClient.on("error", (err) =>
|
|
logger.log(`Redis pubClient error: ${err}`, "ERROR", "redis", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
})
|
|
);
|
|
subClient.on("error", (err) =>
|
|
logger.log(`Redis subClient error: ${err}`, "ERROR", "redis", "api", {
|
|
message: err?.message,
|
|
stack: err?.stack
|
|
})
|
|
);
|
|
|
|
// Register Redis cleanup
|
|
registerCleanupTask(async () => {
|
|
logger.log("Closing Redis connections...", "INFO", "redis", "api");
|
|
await Promise.all([pubClient.disconnect(), subClient.disconnect()]);
|
|
logger.log("Redis connections closed.", "INFO", "redis", "api");
|
|
});
|
|
|
|
// IO Redis
|
|
const ioRedis = new Server(server, {
|
|
path: "/wss",
|
|
adapter: createAdapter(pubClient, subClient),
|
|
cors: {
|
|
origin:
|
|
process.env?.NODE_ENV === "development"
|
|
? [...SOCKETIO_CORS_ORIGIN, ...SOCKETIO_CORS_ORIGIN_DEV]
|
|
: SOCKETIO_CORS_ORIGIN,
|
|
methods: ["GET", "POST"],
|
|
credentials: true,
|
|
exposedHeaders: ["set-cookie"]
|
|
}
|
|
});
|
|
|
|
if (isString(process.env.REDIS_ADMIN_PASS) && !isEmpty(process.env.REDIS_ADMIN_PASS)) {
|
|
logger.log(`Initializing Redis Admin UI....`, "INFO", "redis", "api");
|
|
instrument(ioRedis, {
|
|
auth: {
|
|
type: "basic",
|
|
username: "admin",
|
|
password: process.env.REDIS_ADMIN_PASS
|
|
},
|
|
|
|
mode: process.env.REDIS_ADMIN_MODE || "development"
|
|
});
|
|
}
|
|
|
|
const io = new Server(server, {
|
|
path: "/ws",
|
|
cors: {
|
|
origin: SOCKETIO_CORS_ORIGIN,
|
|
methods: ["GET", "POST"],
|
|
credentials: true,
|
|
exposedHeaders: ["set-cookie"]
|
|
}
|
|
});
|
|
|
|
const api = {
|
|
pubClient,
|
|
io,
|
|
ioRedis,
|
|
redisCluster
|
|
};
|
|
|
|
app.use((req, res, next) => {
|
|
Object.assign(req, api);
|
|
|
|
next();
|
|
});
|
|
|
|
Object.assign(module.exports, api);
|
|
|
|
return api;
|
|
};
|
|
|
|
/**
|
|
* Load Queues for Email and App
|
|
* @param {Object} options - Queue configuration options
|
|
* @param {Redis.Cluster} options.pubClient - Redis client for publishing
|
|
* @param {Object} options.logger - Logger instance
|
|
* @param {Object} options.redisHelpers - Redis helper functions
|
|
* @param {Server} options.ioRedis - Socket.IO server instance
|
|
* @returns {Promise<void>}
|
|
*/
|
|
const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|
const queueSettings = { pubClient, logger, redisHelpers, ioRedis };
|
|
|
|
// Assuming loadEmailQueue and loadAppQueue return Promises
|
|
const [notificationsEmailsQueue, notificationsAppQueue] = await Promise.all([
|
|
loadEmailQueue(queueSettings),
|
|
loadAppQueue(queueSettings)
|
|
]);
|
|
|
|
// Add error listeners or other setup for queues if needed
|
|
notificationsEmailsQueue.on("error", (error) => {
|
|
logger.log(`Error in notificationsEmailsQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message });
|
|
});
|
|
|
|
notificationsAppQueue.on("error", (error) => {
|
|
logger.log(`Error in notificationsAppQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message });
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Main function to start the server
|
|
* @returns {Promise<void>}
|
|
*/
|
|
const main = async () => {
|
|
const app = express();
|
|
const port = process.env.PORT || 4000;
|
|
|
|
const server = http.createServer(app);
|
|
|
|
// Initialize cleanup manager with signal handlers
|
|
initializeCleanupManager();
|
|
|
|
const { pubClient, ioRedis } = await applySocketIO({ server, app });
|
|
const redisHelpers = applyRedisHelpers({ pubClient, app, logger });
|
|
const ioHelpers = applyIOHelpers({ app, redisHelpers, ioRedis, logger });
|
|
|
|
// Legacy Socket Events
|
|
require("./server/web-sockets/web-socket");
|
|
|
|
// Initialize Queues
|
|
await loadQueues({ pubClient: pubClient, logger, redisHelpers, ioRedis });
|
|
|
|
applyMiddleware({ app });
|
|
applyRoutes({ app });
|
|
redisSocketEvents({ io: ioRedis, redisHelpers, ioHelpers, logger });
|
|
|
|
try {
|
|
await server.listen(port);
|
|
logger.log(`Server started on port ${port}`, "INFO", "api");
|
|
} catch (error) {
|
|
logger.log(`Server failed to start on port ${port}`, "ERROR", "api", null, { error: error.message });
|
|
}
|
|
};
|
|
|
|
// Start server
|
|
main().catch((error) => {
|
|
logger.log(`Main-API-Error: Something was not caught in the application.`, "error", "api", null, {
|
|
error: error.message,
|
|
errorjson: JSON.stringify(error),
|
|
stack: error.stack
|
|
});
|
|
// Note: If we want the app to crash on all uncaught async operations, we would
|
|
// need to put a `process.exit(1);` here
|
|
});
|