/** * Environment variables used by this file * Chatter integration * - CHATTER_API_CONCURRENCY * - Maximum number of jobs/interactions posted concurrently *per shop* (within a single shop's batch). * - Default: 5 * - Used by: createConcurrencyLimit(MAX_CONCURRENCY) * * - CHATTER_API_REQUESTS_PER_SECOND * - Per-company outbound request rate (token bucket refill rate). * - Default: 3 * - Must be a positive number; otherwise falls back to default. * - Used by: createTokenBucketRateLimiter({ refillPerSecond }) * * - CHATTER_API_BURST_CAPACITY * - Per-company token bucket capacity (maximum burst size). * - Default: equals CHATTER_API_REQUESTS_PER_SECOND (i.e., 3 unless overridden) * - Must be a positive number; otherwise falls back to default. * - Used by: createTokenBucketRateLimiter({ capacity }) * * - CHATTER_API_MAX_RETRIES * - Maximum number of attempts for posting an interaction before giving up. * - Default: 6 * - Must be a positive integer; otherwise falls back to default. * - Used by: postInteractionWithPolicy() * * - CHATTER_API_TOKEN * - Optional override token for emergency/dev scenarios. * - If set, bypasses Secrets Manager/Redis token retrieval and uses this value for all companies. * - Used by: getChatterApiTokenCached() * * Notes * - Per-company API tokens are otherwise fetched via getChatterApiToken(companyId) (Secrets Manager) * and may be cached via `sessionUtils.getChatterToken/setChatterToken` (Redis-backed). */ const queries = require("../graphql-client/queries"); const moment = require("moment-timezone"); const logger = require("../utils/logger"); const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client"); const client = require("../graphql-client/graphql-client").client; const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION"; const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5); const CHATTER_REQUESTS_PER_SECOND = getPositiveNumber(process.env.CHATTER_API_REQUESTS_PER_SECOND, 3); const CHATTER_BURST_CAPACITY = getPositiveNumber(process.env.CHATTER_API_BURST_CAPACITY, CHATTER_REQUESTS_PER_SECOND); const CHATTER_MAX_RETRIES = getPositiveInteger(process.env.CHATTER_API_MAX_RETRIES, 6); // Client caching (in-memory) - tokens are now cached in Redis const clientCache = new Map(); // companyId -> ChatterApiClient const tokenInFlight = new Map(); // companyId -> Promise (for in-flight deduplication) const companyRateLimiters = new Map(); // companyId -> rate limiter /** * Core processing function for Chatter API jobs. * This can be called by the HTTP handler or the BullMQ worker. * * @param {Object} options - Processing options * @param {string} options.start - Start date for the delivery window * @param {string} options.end - End date for the delivery window * @param {Array} options.bodyshopIds - Optional specific shops to process * @param {boolean} options.skipUpload - Dry-run flag * @param {Object} options.sessionUtils - Optional session utils for token caching * @returns {Promise} Result with totals, allShopSummaries, and allErrors */ async function processChatterApiJob({ start, end, bodyshopIds, skipUpload, sessionUtils }) { logger.log("chatter-api-start", "DEBUG", "api", null, null); const allErrors = []; const allShopSummaries = []; // Shops that DO have chatter_company_id const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY); const shopsToProcess = bodyshopIds?.length > 0 ? bodyshops.filter((shop) => bodyshopIds.includes(shop.id)) : bodyshops; logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length }); if (shopsToProcess.length === 0) { logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null); return { totals: { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }, allShopSummaries: [], allErrors: [] }; } await processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }); const totals = allShopSummaries.reduce( (acc, s) => { acc.shops += 1; acc.jobs += s.jobs || 0; acc.sent += s.sent || 0; acc.duplicates += s.duplicates || 0; acc.failed += s.failed || 0; return acc; }, { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 } ); logger.log("chatter-api-end", "DEBUG", "api", null, totals); return { totals, allShopSummaries, allErrors }; } exports.default = async (req, res) => { if (process.env.NODE_ENV !== "production") return res.sendStatus(403); if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401); res.status(202).json({ success: true, message: "Chatter API job queued for processing", timestamp: new Date().toISOString() }); try { const { dispatchChatterApiJob } = require("./queues/chatterApiQueue"); const { start, end, bodyshopIds, skipUpload } = req.body; await dispatchChatterApiJob({ start, end, bodyshopIds, skipUpload }); } catch (error) { logger.log("chatter-api-queue-dispatch-error", "ERROR", "api", null, { error: error.message, stack: error.stack }); } }; exports.processChatterApiJob = processChatterApiJob; async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) { for (const bodyshop of shopsToProcess) { const summary = { bodyshopid: bodyshop.id, imexshopid: bodyshop.imexshopid, shopname: bodyshop.shopname, chatter_company_id: bodyshop.chatter_company_id, chatterid: bodyshop.chatterid, jobs: 0, sent: 0, duplicates: 0, failed: 0, ok: true }; try { logger.log("chatter-api-start-shop", "DEBUG", "api", bodyshop.id, { shopname: bodyshop.shopname }); const companyId = parseCompanyId(bodyshop.chatter_company_id); if (!companyId) { summary.ok = false; summary.failed = 0; allErrors.push({ ...pickShop(bodyshop), fatal: true, errors: [`Invalid chatter_company_id: "${bodyshop.chatter_company_id}"`] }); allShopSummaries.push(summary); continue; } const chatterApi = await getChatterApiClient(companyId, sessionUtils); const { jobs } = await client.request(queries.CHATTER_QUERY, { bodyshopid: bodyshop.id, start: start ? moment(start).startOf("day") : moment().subtract(1, "days").startOf("day"), ...(end && { end: moment(end).endOf("day") }) }); summary.jobs = jobs.length; // concurrency-limited posting const limit = createConcurrencyLimit(MAX_CONCURRENCY); const results = await Promise.all( jobs.map((j) => limit(async () => { const payload = buildInteractionPayload(bodyshop, j); // keep legacy flag name: skipUpload == dry-run if (skipUpload) return { ok: true, dryRun: true }; const r = await postInteractionWithPolicy(chatterApi, companyId, payload); return r; }) ) ); for (const r of results) { if (r?.dryRun) continue; if (r?.ok && r?.duplicate) summary.duplicates += 1; else if (r?.ok) summary.sent += 1; else summary.failed += 1; } // record failures with some detail (cap to avoid huge emails) const failures = results .filter((r) => r && r.ok === false) .slice(0, 25) .map((r) => ({ status: r.status, error: r.error, context: r.context })); if (failures.length) { summary.ok = false; allErrors.push({ ...pickShop(bodyshop), fatal: false, errors: failures }); } logger.log("chatter-api-end-shop", "DEBUG", "api", bodyshop.id, summary); } catch (error) { summary.ok = false; logger.log("chatter-api-error-shop", "ERROR", "api", bodyshop.id, { error: error.message, stack: error.stack }); allErrors.push({ ...pickShop(bodyshop), fatal: true, errors: [error.toString()] }); } finally { allShopSummaries.push(summary); } } } function buildInteractionPayload(bodyshop, j) { const isCompany = Boolean(j.ownr_co_nm); const locationIdentifier = `${bodyshop.chatter_company_id}-${bodyshop.id}`; const timestamp = formatChatterTimestamp(j.actual_delivery, bodyshop.timezone); if (j.actual_delivery && !timestamp) { logger.log("chatter-api-invalid-delivery-timestamp", "WARN", "api", bodyshop.id, { bodyshopId: bodyshop.id, jobId: j.id, timezone: bodyshop.timezone, actualDelivery: j.actual_delivery }); } return { locationIdentifier: locationIdentifier, event: CHATTER_EVENT, consent: "true", transactionId: j.ro_number != null ? String(j.ro_number) : undefined, timestamp, firstName: isCompany ? null : j.ownr_fn || null, lastName: isCompany ? j.ownr_co_nm : j.ownr_ln || null, emailAddress: j.ownr_ea || undefined, phoneNumber: j.ownr_ph1 || undefined, metadata: { imexShopId: bodyshop.imexshopid, bodyshopId: bodyshop.id, jobId: j.id } }; } async function postInteractionWithPolicy(chatterApi, companyId, payload) { const limiter = getCompanyRateLimiter(companyId); const requestContext = { companyId, locationIdentifier: payload?.locationIdentifier, transactionId: payload?.transactionId, timestamp: payload?.timestamp ?? null, bodyshopId: payload?.metadata?.bodyshopId ?? null, jobId: payload?.metadata?.jobId ?? null }; for (let attempt = 0; attempt < CHATTER_MAX_RETRIES; attempt++) { await limiter.acquire(); try { await chatterApi.postInteraction(companyId, payload); return { ok: true }; } catch (e) { // duplicate -> treat as successful idempotency outcome if (e.status === 409) return { ok: true, duplicate: true, error: e.data }; // rate limited -> backoff + retry if (e.status === 429) { const retryDelayMs = retryDelayMsForError(e, attempt); limiter.pause(retryDelayMs); logger.log("chatter-api-request-rate-limited", "WARN", "api", requestContext.bodyshopId, { ...requestContext, attempt: attempt + 1, maxAttempts: CHATTER_MAX_RETRIES, status: e.status, retryAfterMs: e.retryAfterMs, retryDelayMs, error: e.data ?? e.message }); await sleep(retryDelayMs); continue; } logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, { ...requestContext, attempt: attempt + 1, maxAttempts: CHATTER_MAX_RETRIES, status: e.status, error: e.data ?? e.message }); return { ok: false, status: e.status, error: e.data ?? e.message, context: requestContext }; } } logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, { ...requestContext, maxAttempts: CHATTER_MAX_RETRIES, status: 429, error: "rate limit retry exhausted" }); return { ok: false, status: 429, error: "rate limit retry exhausted", context: requestContext }; } function parseCompanyId(val) { const s = String(val ?? "").trim(); if (!s) return null; const n = Number(s); if (!Number.isFinite(n) || !Number.isInteger(n) || n <= 0) return null; return n; } function pickShop(bodyshop) { return { bodyshopid: bodyshop.id, imexshopid: bodyshop.imexshopid, shopname: bodyshop.shopname, chatter_company_id: bodyshop.chatter_company_id, chatterid: bodyshop.chatterid }; } function sleep(ms) { return new Promise((r) => setTimeout(r, ms)); } function backoffMs(attempt) { const base = Math.min(30_000, 500 * 2 ** attempt); const jitter = Math.floor(Math.random() * 250); return base + jitter; } function retryDelayMsForError(error, attempt) { const retryAfterMs = Number(error?.retryAfterMs); if (Number.isFinite(retryAfterMs) && retryAfterMs > 0) { const jitter = Math.floor(Math.random() * 250); return Math.min(60_000, retryAfterMs + jitter); } return backoffMs(attempt); } function formatChatterTimestamp(value, timezone) { if (!value) return undefined; const hasValidTimezone = Boolean(timezone && moment.tz.zone(timezone)); const parsed = hasValidTimezone ? moment(value).tz(timezone) : moment(value); if (!parsed.isValid()) return undefined; // Keep a strict, Chatter-friendly timestamp without fractional seconds. return parsed.utc().format("YYYY-MM-DD HH:mm:ss[Z]"); } function createConcurrencyLimit(max) { let active = 0; const queue = []; const next = () => { if (active >= max) return; const fn = queue.shift(); if (!fn) return; active++; fn() .catch(() => {}) .finally(() => { active--; next(); }); }; return (fn) => new Promise((resolve, reject) => { queue.push(async () => { try { resolve(await fn()); } catch (e) { reject(e); } }); next(); }); } function getCompanyRateLimiter(companyId) { const key = String(companyId); const existing = companyRateLimiters.get(key); if (existing) return existing; const limiter = createTokenBucketRateLimiter({ refillPerSecond: CHATTER_REQUESTS_PER_SECOND, capacity: CHATTER_BURST_CAPACITY }); companyRateLimiters.set(key, limiter); return limiter; } function createTokenBucketRateLimiter({ refillPerSecond, capacity }) { let tokens = capacity; let lastRefillAt = Date.now(); let pauseUntil = 0; let chain = Promise.resolve(); const refill = () => { const now = Date.now(); const elapsedSec = (now - lastRefillAt) / 1000; if (elapsedSec <= 0) return; tokens = Math.min(capacity, tokens + elapsedSec * refillPerSecond); lastRefillAt = now; }; const waitForPermit = async () => { for (;;) { const now = Date.now(); if (pauseUntil > now) { await sleep(pauseUntil - now); continue; } refill(); if (tokens >= 1) { tokens -= 1; return; } const missing = 1 - tokens; const waitMs = Math.max(25, Math.ceil((missing / refillPerSecond) * 1000)); await sleep(waitMs); } }; return { acquire() { chain = chain.then(waitForPermit, waitForPermit); return chain; }, pause(ms) { const n = Number(ms); if (!Number.isFinite(n) || n <= 0) return; pauseUntil = Math.max(pauseUntil, Date.now() + n); } }; } function getPositiveNumber(value, fallback) { const n = Number(value); return Number.isFinite(n) && n > 0 ? n : fallback; } function getPositiveInteger(value, fallback) { const n = Number(value); return Number.isInteger(n) && n > 0 ? n : fallback; } /** * Returns a per-company Chatter API client, caching both the token and the client. */ async function getChatterApiClient(companyId, sessionUtils) { const key = String(companyId); const existing = clientCache.get(key); if (existing) return existing; const apiToken = await getChatterApiTokenCached(companyId, sessionUtils); const chatterApi = new ChatterApiClient({ baseUrl: CHATTER_BASE_URL, apiToken }); clientCache.set(key, chatterApi); return chatterApi; } /** * Fetches the per-company token from AWS Secrets Manager with Redis caching * SecretId: CHATTER_COMPANY_KEY_ * * Uses Redis caching + in-flight dedupe to avoid hammering Secrets Manager. */ async function getChatterApiTokenCached(companyId, sessionUtils) { const key = String(companyId ?? "").trim(); if (!key) throw new Error("getChatterApiToken: companyId is required"); // Optional override for emergency/dev if (process.env.CHATTER_API_TOKEN) return process.env.CHATTER_API_TOKEN; // Check Redis cache if sessionUtils is available if (sessionUtils?.getChatterToken) { const cachedToken = await sessionUtils.getChatterToken(key); if (cachedToken) { logger.log("chatter-api-get-token-cache-hit", "DEBUG", "api", null, { companyId: key }); return cachedToken; } } // Check for in-flight requests const inflight = tokenInFlight.get(key); if (inflight) return inflight; const p = (async () => { logger.log("chatter-api-get-token-cache-miss", "DEBUG", "api", null, { companyId: key }); // Fetch token from Secrets Manager using shared function const token = await getChatterApiToken(companyId); // Store in Redis cache if sessionUtils is available if (sessionUtils?.setChatterToken) { await sessionUtils.setChatterToken(key, token); } return token; })(); tokenInFlight.set(key, p); try { return await p; } finally { tokenInFlight.delete(key); } }