From d08bfc61cdd098e5b1e5331d019daf4ff3774183 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 11 Feb 2026 11:37:47 -0500 Subject: [PATCH] feature/IO-3556-Chattr-Integration - Retry beef up / tweeks --- server/chatter/chatter-client.js | 13 ++ server/data/chatter-api.js | 199 +++++++++++++++++++++++++++++-- 2 files changed, 204 insertions(+), 8 deletions(-) diff --git a/server/chatter/chatter-client.js b/server/chatter/chatter-client.js index a25f0c812..047e94459 100644 --- a/server/chatter/chatter-client.js +++ b/server/chatter/chatter-client.js @@ -61,6 +61,8 @@ class ChatterApiClient { const err = new Error(`Chatter API error ${res.status} | ${data?.message}`); err.status = res.status; err.data = data; + const retryAfterMs = parseRetryAfterMs(res.headers.get("retry-after")); + if (retryAfterMs != null) err.retryAfterMs = retryAfterMs; throw err; } return data; @@ -78,6 +80,17 @@ function safeJson(text) { } } +function parseRetryAfterMs(value) { + if (!value) return null; + + const sec = Number(value); + if (Number.isFinite(sec) && sec >= 0) return Math.ceil(sec * 1000); + + const dateMs = Date.parse(value); + if (!Number.isFinite(dateMs)) return null; + return Math.max(0, dateMs - Date.now()); +} + /** * Fetches Chatter API token from AWS Secrets Manager * SecretId format: CHATTER_COMPANY_KEY_ diff --git a/server/data/chatter-api.js b/server/data/chatter-api.js index 89f29b8ae..7c50d0ca1 100644 --- a/server/data/chatter-api.js +++ b/server/data/chatter-api.js @@ -1,3 +1,39 @@ +/** + * Environment variables used by this file + * Chatter integration + * - CHATTER_API_CONCURRENCY + * - Maximum number of jobs/interactions posted concurrently *per shop* (within a single shop's batch). + * - Default: 5 + * - Used by: createConcurrencyLimit(MAX_CONCURRENCY) + * + * - CHATTER_API_REQUESTS_PER_SECOND + * - Per-company outbound request rate (token bucket refill rate). + * - Default: 3 + * - Must be a positive number; otherwise falls back to default. + * - Used by: createTokenBucketRateLimiter({ refillPerSecond }) + * + * - CHATTER_API_BURST_CAPACITY + * - Per-company token bucket capacity (maximum burst size). + * - Default: equals CHATTER_API_REQUESTS_PER_SECOND (i.e., 3 unless overridden) + * - Must be a positive number; otherwise falls back to default. + * - Used by: createTokenBucketRateLimiter({ capacity }) + * + * - CHATTER_API_MAX_RETRIES + * - Maximum number of attempts for posting an interaction before giving up. + * - Default: 6 + * - Must be a positive integer; otherwise falls back to default. + * - Used by: postInteractionWithPolicy() + * + * - CHATTER_API_TOKEN + * - Optional override token for emergency/dev scenarios. + * - If set, bypasses Secrets Manager/Redis token retrieval and uses this value for all companies. + * - Used by: getChatterApiTokenCached() + * + * Notes + * - Per-company API tokens are otherwise fetched via getChatterApiToken(companyId) (Secrets Manager) + * and may be cached via `sessionUtils.getChatterToken/setChatterToken` (Redis-backed). + */ + const queries = require("../graphql-client/queries"); const moment = require("moment-timezone"); const logger = require("../utils/logger"); @@ -6,12 +42,16 @@ const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../c const client = require("../graphql-client/graphql-client").client; const { sendServerEmail } = require("../email/sendemail"); -const CHATTER_EVENT = process.env.CHATTER_SOLICITATION_EVENT || "delivery"; +const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION"; const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5); +const CHATTER_REQUESTS_PER_SECOND = getPositiveNumber(process.env.CHATTER_API_REQUESTS_PER_SECOND, 3); +const CHATTER_BURST_CAPACITY = getPositiveNumber(process.env.CHATTER_API_BURST_CAPACITY, CHATTER_REQUESTS_PER_SECOND); +const CHATTER_MAX_RETRIES = getPositiveInteger(process.env.CHATTER_API_MAX_RETRIES, 6); // Client caching (in-memory) - tokens are now cached in Redis const clientCache = new Map(); // companyId -> ChatterApiClient const tokenInFlight = new Map(); // companyId -> Promise (for in-flight deduplication) +const companyRateLimiters = new Map(); // companyId -> rate limiter exports.default = async (req, res) => { if (process.env.NODE_ENV !== "production") return res.sendStatus(403); @@ -19,7 +59,7 @@ exports.default = async (req, res) => { res.status(202).json({ success: true, - message: "Processing request ...", + message: "Processing Chatter-API Cron request ...", timestamp: new Date().toISOString() }); @@ -149,7 +189,11 @@ async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShop const failures = results .filter((r) => r && r.ok === false) .slice(0, 25) - .map((r) => ({ status: r.status, error: r.error })); + .map((r) => ({ + status: r.status, + error: r.error, + context: r.context + })); if (failures.length) { summary.ok = false; @@ -184,12 +228,22 @@ function buildInteractionPayload(bodyshop, j) { const isCompany = Boolean(j.ownr_co_nm); const locationIdentifier = `${bodyshop.chatter_company_id}-${bodyshop.id}`; + const timestamp = formatChatterTimestamp(j.actual_delivery, bodyshop.timezone); + + if (j.actual_delivery && !timestamp) { + logger.log("chatter-api-invalid-delivery-timestamp", "WARN", "api", bodyshop.id, { + bodyshopId: bodyshop.id, + jobId: j.id, + timezone: bodyshop.timezone, + actualDelivery: j.actual_delivery + }); + } return { locationIdentifier: locationIdentifier, event: CHATTER_EVENT, transactionId: j.ro_number != null ? String(j.ro_number) : undefined, - timestamp: j.actual_delivery ? moment(j.actual_delivery).tz(bodyshop.timezone).toISOString() : undefined, + timestamp, firstName: isCompany ? null : j.ownr_fn || null, lastName: isCompany ? j.ownr_co_nm : j.ownr_ln || null, emailAddress: j.ownr_ea || undefined, @@ -203,7 +257,19 @@ function buildInteractionPayload(bodyshop, j) { } async function postInteractionWithPolicy(chatterApi, companyId, payload) { - for (let attempt = 0; attempt < 6; attempt++) { + const limiter = getCompanyRateLimiter(companyId); + const requestContext = { + companyId, + locationIdentifier: payload?.locationIdentifier, + transactionId: payload?.transactionId, + timestamp: payload?.timestamp ?? null, + bodyshopId: payload?.metadata?.bodyshopId ?? null, + jobId: payload?.metadata?.jobId ?? null + }; + + for (let attempt = 0; attempt < CHATTER_MAX_RETRIES; attempt++) { + await limiter.acquire(); + try { await chatterApi.postInteraction(companyId, payload); return { ok: true }; @@ -213,14 +279,40 @@ async function postInteractionWithPolicy(chatterApi, companyId, payload) { // rate limited -> backoff + retry if (e.status === 429) { - await sleep(backoffMs(attempt)); + const retryDelayMs = retryDelayMsForError(e, attempt); + limiter.pause(retryDelayMs); + logger.log("chatter-api-request-rate-limited", "WARN", "api", requestContext.bodyshopId, { + ...requestContext, + attempt: attempt + 1, + maxAttempts: CHATTER_MAX_RETRIES, + status: e.status, + retryAfterMs: e.retryAfterMs, + retryDelayMs, + error: e.data ?? e.message + }); + await sleep(retryDelayMs); continue; } - return { ok: false, status: e.status, error: e.data ?? e.message }; + logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, { + ...requestContext, + attempt: attempt + 1, + maxAttempts: CHATTER_MAX_RETRIES, + status: e.status, + error: e.data ?? e.message + }); + return { ok: false, status: e.status, error: e.data ?? e.message, context: requestContext }; } } - return { ok: false, status: 429, error: "rate limit retry exhausted" }; + + logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, { + ...requestContext, + maxAttempts: CHATTER_MAX_RETRIES, + status: 429, + error: "rate limit retry exhausted" + }); + + return { ok: false, status: 429, error: "rate limit retry exhausted", context: requestContext }; } function parseCompanyId(val) { @@ -251,6 +343,26 @@ function backoffMs(attempt) { return base + jitter; } +function retryDelayMsForError(error, attempt) { + const retryAfterMs = Number(error?.retryAfterMs); + if (Number.isFinite(retryAfterMs) && retryAfterMs > 0) { + const jitter = Math.floor(Math.random() * 250); + return Math.min(60_000, retryAfterMs + jitter); + } + return backoffMs(attempt); +} + +function formatChatterTimestamp(value, timezone) { + if (!value) return undefined; + + const hasValidTimezone = Boolean(timezone && moment.tz.zone(timezone)); + const parsed = hasValidTimezone ? moment(value).tz(timezone) : moment(value); + if (!parsed.isValid()) return undefined; + + // Keep a strict, Chatter-friendly timestamp without fractional seconds. + return parsed.utc().format("YYYY-MM-DD HH:mm:ss[Z]"); +} + function createConcurrencyLimit(max) { let active = 0; const queue = []; @@ -281,6 +393,77 @@ function createConcurrencyLimit(max) { }); } +function getCompanyRateLimiter(companyId) { + const key = String(companyId); + const existing = companyRateLimiters.get(key); + if (existing) return existing; + + const limiter = createTokenBucketRateLimiter({ + refillPerSecond: CHATTER_REQUESTS_PER_SECOND, + capacity: CHATTER_BURST_CAPACITY + }); + + companyRateLimiters.set(key, limiter); + return limiter; +} + +function createTokenBucketRateLimiter({ refillPerSecond, capacity }) { + let tokens = capacity; + let lastRefillAt = Date.now(); + let pauseUntil = 0; + let chain = Promise.resolve(); + + const refill = () => { + const now = Date.now(); + const elapsedSec = (now - lastRefillAt) / 1000; + if (elapsedSec <= 0) return; + tokens = Math.min(capacity, tokens + elapsedSec * refillPerSecond); + lastRefillAt = now; + }; + + const waitForPermit = async () => { + for (;;) { + const now = Date.now(); + if (pauseUntil > now) { + await sleep(pauseUntil - now); + continue; + } + + refill(); + if (tokens >= 1) { + tokens -= 1; + return; + } + + const missing = 1 - tokens; + const waitMs = Math.max(25, Math.ceil((missing / refillPerSecond) * 1000)); + await sleep(waitMs); + } + }; + + return { + acquire() { + chain = chain.then(waitForPermit, waitForPermit); + return chain; + }, + pause(ms) { + const n = Number(ms); + if (!Number.isFinite(n) || n <= 0) return; + pauseUntil = Math.max(pauseUntil, Date.now() + n); + } + }; +} + +function getPositiveNumber(value, fallback) { + const n = Number(value); + return Number.isFinite(n) && n > 0 ? n : fallback; +} + +function getPositiveInteger(value, fallback) { + const n = Number(value); + return Number.isInteger(n) && n > 0 ? n : fallback; +} + /** * Returns a per-company Chatter API client, caching both the token and the client. */