feature/IO-3556-Chattr-Integration - Add in Redis caching for Chatter
This commit is contained in:
@@ -2,32 +2,16 @@ const queries = require("../graphql-client/queries");
|
||||
const moment = require("moment-timezone");
|
||||
const logger = require("../utils/logger");
|
||||
const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client");
|
||||
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
|
||||
const { isString, isEmpty } = require("lodash");
|
||||
|
||||
const client = require("../graphql-client/graphql-client").client;
|
||||
const { sendServerEmail } = require("../email/sendemail");
|
||||
|
||||
const CHATTER_EVENT = process.env.CHATTER_SOLICITATION_EVENT || "delivery";
|
||||
const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5);
|
||||
const AWS_REGION = process.env.AWS_REGION || "ca-central-1";
|
||||
|
||||
// Configure SecretsManager client with localstack support for caching implementation
|
||||
const secretsClientOptions = {
|
||||
region: AWS_REGION,
|
||||
credentials: defaultProvider()
|
||||
};
|
||||
|
||||
const isLocal = isString(process.env?.LOCALSTACK_HOSTNAME) && !isEmpty(process.env?.LOCALSTACK_HOSTNAME);
|
||||
|
||||
if (isLocal) {
|
||||
secretsClientOptions.endpoint = `http://${process.env.LOCALSTACK_HOSTNAME}:4566`;
|
||||
}
|
||||
|
||||
// Token and client caching for performance
|
||||
const tokenCache = new Map(); // companyId -> token string
|
||||
const tokenInFlight = new Map(); // companyId -> Promise<string>
|
||||
// Client caching (in-memory) - tokens are now cached in Redis
|
||||
const clientCache = new Map(); // companyId -> ChatterApiClient
|
||||
const tokenInFlight = new Map(); // companyId -> Promise<string> (for in-flight deduplication)
|
||||
|
||||
exports.default = async (req, res) => {
|
||||
if (process.env.NODE_ENV !== "production") return res.sendStatus(403);
|
||||
@@ -67,7 +51,8 @@ exports.default = async (req, res) => {
|
||||
end,
|
||||
skipUpload,
|
||||
allShopSummaries,
|
||||
allErrors
|
||||
allErrors,
|
||||
sessionUtils: req.sessionUtils
|
||||
});
|
||||
|
||||
const totals = allShopSummaries.reduce(
|
||||
@@ -96,7 +81,7 @@ exports.default = async (req, res) => {
|
||||
}
|
||||
};
|
||||
|
||||
async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors }) {
|
||||
async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) {
|
||||
for (const bodyshop of shopsToProcess) {
|
||||
const summary = {
|
||||
bodyshopid: bodyshop.id,
|
||||
@@ -127,7 +112,7 @@ async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShop
|
||||
continue;
|
||||
}
|
||||
|
||||
const chatterApi = await getChatterApiClient(companyId);
|
||||
const chatterApi = await getChatterApiClient(companyId, sessionUtils);
|
||||
|
||||
const { jobs } = await client.request(queries.CHATTER_QUERY, {
|
||||
bodyshopid: bodyshop.id,
|
||||
@@ -299,13 +284,13 @@ function createConcurrencyLimit(max) {
|
||||
/**
|
||||
* Returns a per-company Chatter API client, caching both the token and the client.
|
||||
*/
|
||||
async function getChatterApiClient(companyId) {
|
||||
async function getChatterApiClient(companyId, sessionUtils) {
|
||||
const key = String(companyId);
|
||||
|
||||
const existing = clientCache.get(key);
|
||||
if (existing) return existing;
|
||||
|
||||
const apiToken = await getChatterApiTokenCached(companyId);
|
||||
const apiToken = await getChatterApiTokenCached(companyId, sessionUtils);
|
||||
const chatterApi = new ChatterApiClient({ baseUrl: CHATTER_BASE_URL, apiToken });
|
||||
|
||||
clientCache.set(key, chatterApi);
|
||||
@@ -313,30 +298,42 @@ async function getChatterApiClient(companyId) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the per-company token from AWS Secrets Manager with caching
|
||||
* Fetches the per-company token from AWS Secrets Manager with Redis caching
|
||||
* SecretId: CHATTER_COMPANY_KEY_<companyId>
|
||||
*
|
||||
* Uses caching + in-flight dedupe to avoid hammering Secrets Manager.
|
||||
* Uses Redis caching + in-flight dedupe to avoid hammering Secrets Manager.
|
||||
*/
|
||||
async function getChatterApiTokenCached(companyId) {
|
||||
async function getChatterApiTokenCached(companyId, sessionUtils) {
|
||||
const key = String(companyId ?? "").trim();
|
||||
if (!key) throw new Error("getChatterApiToken: companyId is required");
|
||||
|
||||
// Optional override for emergency/dev
|
||||
if (process.env.CHATTER_API_TOKEN) return process.env.CHATTER_API_TOKEN;
|
||||
|
||||
const cached = tokenCache.get(key);
|
||||
if (cached) return cached;
|
||||
// Check Redis cache if sessionUtils is available
|
||||
if (sessionUtils?.getChatterToken) {
|
||||
const cachedToken = await sessionUtils.getChatterToken(key);
|
||||
if (cachedToken) {
|
||||
logger.log("chatter-api-get-token-cache-hit", "DEBUG", "api", null, { companyId: key });
|
||||
return cachedToken;
|
||||
}
|
||||
}
|
||||
|
||||
// Check for in-flight requests
|
||||
const inflight = tokenInFlight.get(key);
|
||||
if (inflight) return inflight;
|
||||
|
||||
const p = (async () => {
|
||||
logger.log("chatter-api-get-token", "DEBUG", "api", null, { companyId: key });
|
||||
logger.log("chatter-api-get-token-cache-miss", "DEBUG", "api", null, { companyId: key });
|
||||
|
||||
// Use the shared function from chatter-client
|
||||
// Fetch token from Secrets Manager using shared function
|
||||
const token = await getChatterApiToken(companyId);
|
||||
tokenCache.set(key, token);
|
||||
|
||||
// Store in Redis cache if sessionUtils is available
|
||||
if (sessionUtils?.setChatterToken) {
|
||||
await sessionUtils.setChatterToken(key, token);
|
||||
}
|
||||
|
||||
return token;
|
||||
})();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user