const queries = require("../graphql-client/queries"); const moment = require("moment-timezone"); const logger = require("../utils/logger"); const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client"); const client = require("../graphql-client/graphql-client").client; const { sendServerEmail } = require("../email/sendemail"); const CHATTER_EVENT = process.env.CHATTER_SOLICITATION_EVENT || "delivery"; const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5); // Client caching (in-memory) - tokens are now cached in Redis const clientCache = new Map(); // companyId -> ChatterApiClient const tokenInFlight = new Map(); // companyId -> Promise (for in-flight deduplication) exports.default = async (req, res) => { if (process.env.NODE_ENV !== "production") return res.sendStatus(403); if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401); res.status(202).json({ success: true, message: "Processing request ...", timestamp: new Date().toISOString() }); try { logger.log("chatter-api-start", "DEBUG", "api", null, null); const allErrors = []; const allShopSummaries = []; // Shops that DO have chatter_company_id const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY); const specificShopIds = req.body.bodyshopIds; const { start, end, skipUpload } = req.body; // keep same flag; now acts like "dry run" const shopsToProcess = specificShopIds?.length > 0 ? bodyshops.filter((shop) => specificShopIds.includes(shop.id)) : bodyshops; logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length }); if (shopsToProcess.length === 0) { logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null); return; } await processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils: req.sessionUtils }); const totals = allShopSummaries.reduce( (acc, s) => { acc.shops += 1; acc.jobs += s.jobs || 0; acc.sent += s.sent || 0; acc.duplicates += s.duplicates || 0; acc.failed += s.failed || 0; return acc; }, { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 } ); await sendServerEmail({ subject: `Chatter API Report ${moment().format("MM-DD-YY")}`, text: `Totals:\n${JSON.stringify(totals, null, 2)}\n\n` + `Shop summaries:\n${JSON.stringify(allShopSummaries, null, 2)}\n\n` + `Errors:\n${JSON.stringify(allErrors, null, 2)}\n` }); logger.log("chatter-api-end", "DEBUG", "api", null, totals); } catch (error) { logger.log("chatter-api-error", "ERROR", "api", null, { error: error.message, stack: error.stack }); } }; async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) { for (const bodyshop of shopsToProcess) { const summary = { bodyshopid: bodyshop.id, imexshopid: bodyshop.imexshopid, shopname: bodyshop.shopname, chatter_company_id: bodyshop.chatter_company_id, chatterid: bodyshop.chatterid, jobs: 0, sent: 0, duplicates: 0, failed: 0, ok: true }; try { logger.log("chatter-api-start-shop", "DEBUG", "api", bodyshop.id, { shopname: bodyshop.shopname }); const companyId = parseCompanyId(bodyshop.chatter_company_id); if (!companyId) { summary.ok = false; summary.failed = 0; allErrors.push({ ...pickShop(bodyshop), fatal: true, errors: [`Invalid chatter_company_id: "${bodyshop.chatter_company_id}"`] }); allShopSummaries.push(summary); continue; } const chatterApi = await getChatterApiClient(companyId, sessionUtils); const { jobs } = await client.request(queries.CHATTER_QUERY, { bodyshopid: bodyshop.id, start: start ? moment(start).startOf("day") : moment().subtract(1, "days").startOf("day"), ...(end && { end: moment(end).endOf("day") }) }); summary.jobs = jobs.length; // concurrency-limited posting const limit = createConcurrencyLimit(MAX_CONCURRENCY); const results = await Promise.all( jobs.map((j) => limit(async () => { const payload = buildInteractionPayload(bodyshop, j); // keep legacy flag name: skipUpload == dry-run if (skipUpload) return { ok: true, dryRun: true }; const r = await postInteractionWithPolicy(chatterApi, companyId, payload); return r; }) ) ); for (const r of results) { if (r?.dryRun) continue; if (r?.ok && r?.duplicate) summary.duplicates += 1; else if (r?.ok) summary.sent += 1; else summary.failed += 1; } // record failures with some detail (cap to avoid huge emails) const failures = results .filter((r) => r && r.ok === false) .slice(0, 25) .map((r) => ({ status: r.status, error: r.error })); if (failures.length) { summary.ok = false; allErrors.push({ ...pickShop(bodyshop), fatal: false, errors: failures }); } logger.log("chatter-api-end-shop", "DEBUG", "api", bodyshop.id, summary); } catch (error) { summary.ok = false; logger.log("chatter-api-error-shop", "ERROR", "api", bodyshop.id, { error: error.message, stack: error.stack }); allErrors.push({ ...pickShop(bodyshop), fatal: true, errors: [error.toString()] }); } finally { allShopSummaries.push(summary); } } } function buildInteractionPayload(bodyshop, j) { const isCompany = Boolean(j.ownr_co_nm); const locationIdentifier = `${bodyshop.chatter_company_id}-${bodyshop.id}`; return { locationIdentifier: locationIdentifier, event: CHATTER_EVENT, transactionId: j.ro_number != null ? String(j.ro_number) : undefined, timestamp: j.actual_delivery ? moment(j.actual_delivery).tz(bodyshop.timezone).toISOString() : undefined, firstName: isCompany ? null : j.ownr_fn || null, lastName: isCompany ? j.ownr_co_nm : j.ownr_ln || null, emailAddress: j.ownr_ea || undefined, phoneNumber: j.ownr_ph1 || undefined, metadata: { imexShopId: bodyshop.imexshopid, bodyshopId: bodyshop.id, jobId: j.id } }; } async function postInteractionWithPolicy(chatterApi, companyId, payload) { for (let attempt = 0; attempt < 6; attempt++) { try { await chatterApi.postInteraction(companyId, payload); return { ok: true }; } catch (e) { // duplicate -> treat as successful idempotency outcome if (e.status === 409) return { ok: true, duplicate: true, error: e.data }; // rate limited -> backoff + retry if (e.status === 429) { await sleep(backoffMs(attempt)); continue; } return { ok: false, status: e.status, error: e.data ?? e.message }; } } return { ok: false, status: 429, error: "rate limit retry exhausted" }; } function parseCompanyId(val) { const s = String(val ?? "").trim(); if (!s) return null; const n = Number(s); if (!Number.isFinite(n) || !Number.isInteger(n) || n <= 0) return null; return n; } function pickShop(bodyshop) { return { bodyshopid: bodyshop.id, imexshopid: bodyshop.imexshopid, shopname: bodyshop.shopname, chatter_company_id: bodyshop.chatter_company_id, chatterid: bodyshop.chatterid }; } function sleep(ms) { return new Promise((r) => setTimeout(r, ms)); } function backoffMs(attempt) { const base = Math.min(30_000, 500 * 2 ** attempt); const jitter = Math.floor(Math.random() * 250); return base + jitter; } function createConcurrencyLimit(max) { let active = 0; const queue = []; const next = () => { if (active >= max) return; const fn = queue.shift(); if (!fn) return; active++; fn() .catch(() => {}) .finally(() => { active--; next(); }); }; return (fn) => new Promise((resolve, reject) => { queue.push(async () => { try { resolve(await fn()); } catch (e) { reject(e); } }); next(); }); } /** * Returns a per-company Chatter API client, caching both the token and the client. */ async function getChatterApiClient(companyId, sessionUtils) { const key = String(companyId); const existing = clientCache.get(key); if (existing) return existing; const apiToken = await getChatterApiTokenCached(companyId, sessionUtils); const chatterApi = new ChatterApiClient({ baseUrl: CHATTER_BASE_URL, apiToken }); clientCache.set(key, chatterApi); return chatterApi; } /** * Fetches the per-company token from AWS Secrets Manager with Redis caching * SecretId: CHATTER_COMPANY_KEY_ * * Uses Redis caching + in-flight dedupe to avoid hammering Secrets Manager. */ async function getChatterApiTokenCached(companyId, sessionUtils) { const key = String(companyId ?? "").trim(); if (!key) throw new Error("getChatterApiToken: companyId is required"); // Optional override for emergency/dev if (process.env.CHATTER_API_TOKEN) return process.env.CHATTER_API_TOKEN; // Check Redis cache if sessionUtils is available if (sessionUtils?.getChatterToken) { const cachedToken = await sessionUtils.getChatterToken(key); if (cachedToken) { logger.log("chatter-api-get-token-cache-hit", "DEBUG", "api", null, { companyId: key }); return cachedToken; } } // Check for in-flight requests const inflight = tokenInFlight.get(key); if (inflight) return inflight; const p = (async () => { logger.log("chatter-api-get-token-cache-miss", "DEBUG", "api", null, { companyId: key }); // Fetch token from Secrets Manager using shared function const token = await getChatterApiToken(companyId); // Store in Redis cache if sessionUtils is available if (sessionUtils?.setChatterToken) { await sessionUtils.setChatterToken(key, token); } return token; })(); tokenInFlight.set(key, p); try { return await p; } finally { tokenInFlight.delete(key); } }