feature/IO-3556-Chattr-Integration - Retry beef up / tweeks
This commit is contained in:
@@ -61,6 +61,8 @@ class ChatterApiClient {
|
||||
const err = new Error(`Chatter API error ${res.status} | ${data?.message}`);
|
||||
err.status = res.status;
|
||||
err.data = data;
|
||||
const retryAfterMs = parseRetryAfterMs(res.headers.get("retry-after"));
|
||||
if (retryAfterMs != null) err.retryAfterMs = retryAfterMs;
|
||||
throw err;
|
||||
}
|
||||
return data;
|
||||
@@ -78,6 +80,17 @@ function safeJson(text) {
|
||||
}
|
||||
}
|
||||
|
||||
function parseRetryAfterMs(value) {
|
||||
if (!value) return null;
|
||||
|
||||
const sec = Number(value);
|
||||
if (Number.isFinite(sec) && sec >= 0) return Math.ceil(sec * 1000);
|
||||
|
||||
const dateMs = Date.parse(value);
|
||||
if (!Number.isFinite(dateMs)) return null;
|
||||
return Math.max(0, dateMs - Date.now());
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches Chatter API token from AWS Secrets Manager
|
||||
* SecretId format: CHATTER_COMPANY_KEY_<companyId>
|
||||
|
||||
@@ -1,3 +1,39 @@
|
||||
/**
|
||||
* Environment variables used by this file
|
||||
* Chatter integration
|
||||
* - CHATTER_API_CONCURRENCY
|
||||
* - Maximum number of jobs/interactions posted concurrently *per shop* (within a single shop's batch).
|
||||
* - Default: 5
|
||||
* - Used by: createConcurrencyLimit(MAX_CONCURRENCY)
|
||||
*
|
||||
* - CHATTER_API_REQUESTS_PER_SECOND
|
||||
* - Per-company outbound request rate (token bucket refill rate).
|
||||
* - Default: 3
|
||||
* - Must be a positive number; otherwise falls back to default.
|
||||
* - Used by: createTokenBucketRateLimiter({ refillPerSecond })
|
||||
*
|
||||
* - CHATTER_API_BURST_CAPACITY
|
||||
* - Per-company token bucket capacity (maximum burst size).
|
||||
* - Default: equals CHATTER_API_REQUESTS_PER_SECOND (i.e., 3 unless overridden)
|
||||
* - Must be a positive number; otherwise falls back to default.
|
||||
* - Used by: createTokenBucketRateLimiter({ capacity })
|
||||
*
|
||||
* - CHATTER_API_MAX_RETRIES
|
||||
* - Maximum number of attempts for posting an interaction before giving up.
|
||||
* - Default: 6
|
||||
* - Must be a positive integer; otherwise falls back to default.
|
||||
* - Used by: postInteractionWithPolicy()
|
||||
*
|
||||
* - CHATTER_API_TOKEN
|
||||
* - Optional override token for emergency/dev scenarios.
|
||||
* - If set, bypasses Secrets Manager/Redis token retrieval and uses this value for all companies.
|
||||
* - Used by: getChatterApiTokenCached()
|
||||
*
|
||||
* Notes
|
||||
* - Per-company API tokens are otherwise fetched via getChatterApiToken(companyId) (Secrets Manager)
|
||||
* and may be cached via `sessionUtils.getChatterToken/setChatterToken` (Redis-backed).
|
||||
*/
|
||||
|
||||
const queries = require("../graphql-client/queries");
|
||||
const moment = require("moment-timezone");
|
||||
const logger = require("../utils/logger");
|
||||
@@ -6,12 +42,16 @@ const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../c
|
||||
const client = require("../graphql-client/graphql-client").client;
|
||||
const { sendServerEmail } = require("../email/sendemail");
|
||||
|
||||
const CHATTER_EVENT = process.env.CHATTER_SOLICITATION_EVENT || "delivery";
|
||||
const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION";
|
||||
const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5);
|
||||
const CHATTER_REQUESTS_PER_SECOND = getPositiveNumber(process.env.CHATTER_API_REQUESTS_PER_SECOND, 3);
|
||||
const CHATTER_BURST_CAPACITY = getPositiveNumber(process.env.CHATTER_API_BURST_CAPACITY, CHATTER_REQUESTS_PER_SECOND);
|
||||
const CHATTER_MAX_RETRIES = getPositiveInteger(process.env.CHATTER_API_MAX_RETRIES, 6);
|
||||
|
||||
// Client caching (in-memory) - tokens are now cached in Redis
|
||||
const clientCache = new Map(); // companyId -> ChatterApiClient
|
||||
const tokenInFlight = new Map(); // companyId -> Promise<string> (for in-flight deduplication)
|
||||
const companyRateLimiters = new Map(); // companyId -> rate limiter
|
||||
|
||||
exports.default = async (req, res) => {
|
||||
if (process.env.NODE_ENV !== "production") return res.sendStatus(403);
|
||||
@@ -19,7 +59,7 @@ exports.default = async (req, res) => {
|
||||
|
||||
res.status(202).json({
|
||||
success: true,
|
||||
message: "Processing request ...",
|
||||
message: "Processing Chatter-API Cron request ...",
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
@@ -149,7 +189,11 @@ async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShop
|
||||
const failures = results
|
||||
.filter((r) => r && r.ok === false)
|
||||
.slice(0, 25)
|
||||
.map((r) => ({ status: r.status, error: r.error }));
|
||||
.map((r) => ({
|
||||
status: r.status,
|
||||
error: r.error,
|
||||
context: r.context
|
||||
}));
|
||||
|
||||
if (failures.length) {
|
||||
summary.ok = false;
|
||||
@@ -184,12 +228,22 @@ function buildInteractionPayload(bodyshop, j) {
|
||||
const isCompany = Boolean(j.ownr_co_nm);
|
||||
|
||||
const locationIdentifier = `${bodyshop.chatter_company_id}-${bodyshop.id}`;
|
||||
const timestamp = formatChatterTimestamp(j.actual_delivery, bodyshop.timezone);
|
||||
|
||||
if (j.actual_delivery && !timestamp) {
|
||||
logger.log("chatter-api-invalid-delivery-timestamp", "WARN", "api", bodyshop.id, {
|
||||
bodyshopId: bodyshop.id,
|
||||
jobId: j.id,
|
||||
timezone: bodyshop.timezone,
|
||||
actualDelivery: j.actual_delivery
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
locationIdentifier: locationIdentifier,
|
||||
event: CHATTER_EVENT,
|
||||
transactionId: j.ro_number != null ? String(j.ro_number) : undefined,
|
||||
timestamp: j.actual_delivery ? moment(j.actual_delivery).tz(bodyshop.timezone).toISOString() : undefined,
|
||||
timestamp,
|
||||
firstName: isCompany ? null : j.ownr_fn || null,
|
||||
lastName: isCompany ? j.ownr_co_nm : j.ownr_ln || null,
|
||||
emailAddress: j.ownr_ea || undefined,
|
||||
@@ -203,7 +257,19 @@ function buildInteractionPayload(bodyshop, j) {
|
||||
}
|
||||
|
||||
async function postInteractionWithPolicy(chatterApi, companyId, payload) {
|
||||
for (let attempt = 0; attempt < 6; attempt++) {
|
||||
const limiter = getCompanyRateLimiter(companyId);
|
||||
const requestContext = {
|
||||
companyId,
|
||||
locationIdentifier: payload?.locationIdentifier,
|
||||
transactionId: payload?.transactionId,
|
||||
timestamp: payload?.timestamp ?? null,
|
||||
bodyshopId: payload?.metadata?.bodyshopId ?? null,
|
||||
jobId: payload?.metadata?.jobId ?? null
|
||||
};
|
||||
|
||||
for (let attempt = 0; attempt < CHATTER_MAX_RETRIES; attempt++) {
|
||||
await limiter.acquire();
|
||||
|
||||
try {
|
||||
await chatterApi.postInteraction(companyId, payload);
|
||||
return { ok: true };
|
||||
@@ -213,14 +279,40 @@ async function postInteractionWithPolicy(chatterApi, companyId, payload) {
|
||||
|
||||
// rate limited -> backoff + retry
|
||||
if (e.status === 429) {
|
||||
await sleep(backoffMs(attempt));
|
||||
const retryDelayMs = retryDelayMsForError(e, attempt);
|
||||
limiter.pause(retryDelayMs);
|
||||
logger.log("chatter-api-request-rate-limited", "WARN", "api", requestContext.bodyshopId, {
|
||||
...requestContext,
|
||||
attempt: attempt + 1,
|
||||
maxAttempts: CHATTER_MAX_RETRIES,
|
||||
status: e.status,
|
||||
retryAfterMs: e.retryAfterMs,
|
||||
retryDelayMs,
|
||||
error: e.data ?? e.message
|
||||
});
|
||||
await sleep(retryDelayMs);
|
||||
continue;
|
||||
}
|
||||
|
||||
return { ok: false, status: e.status, error: e.data ?? e.message };
|
||||
logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, {
|
||||
...requestContext,
|
||||
attempt: attempt + 1,
|
||||
maxAttempts: CHATTER_MAX_RETRIES,
|
||||
status: e.status,
|
||||
error: e.data ?? e.message
|
||||
});
|
||||
return { ok: false, status: e.status, error: e.data ?? e.message, context: requestContext };
|
||||
}
|
||||
}
|
||||
return { ok: false, status: 429, error: "rate limit retry exhausted" };
|
||||
|
||||
logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, {
|
||||
...requestContext,
|
||||
maxAttempts: CHATTER_MAX_RETRIES,
|
||||
status: 429,
|
||||
error: "rate limit retry exhausted"
|
||||
});
|
||||
|
||||
return { ok: false, status: 429, error: "rate limit retry exhausted", context: requestContext };
|
||||
}
|
||||
|
||||
function parseCompanyId(val) {
|
||||
@@ -251,6 +343,26 @@ function backoffMs(attempt) {
|
||||
return base + jitter;
|
||||
}
|
||||
|
||||
function retryDelayMsForError(error, attempt) {
|
||||
const retryAfterMs = Number(error?.retryAfterMs);
|
||||
if (Number.isFinite(retryAfterMs) && retryAfterMs > 0) {
|
||||
const jitter = Math.floor(Math.random() * 250);
|
||||
return Math.min(60_000, retryAfterMs + jitter);
|
||||
}
|
||||
return backoffMs(attempt);
|
||||
}
|
||||
|
||||
function formatChatterTimestamp(value, timezone) {
|
||||
if (!value) return undefined;
|
||||
|
||||
const hasValidTimezone = Boolean(timezone && moment.tz.zone(timezone));
|
||||
const parsed = hasValidTimezone ? moment(value).tz(timezone) : moment(value);
|
||||
if (!parsed.isValid()) return undefined;
|
||||
|
||||
// Keep a strict, Chatter-friendly timestamp without fractional seconds.
|
||||
return parsed.utc().format("YYYY-MM-DD HH:mm:ss[Z]");
|
||||
}
|
||||
|
||||
function createConcurrencyLimit(max) {
|
||||
let active = 0;
|
||||
const queue = [];
|
||||
@@ -281,6 +393,77 @@ function createConcurrencyLimit(max) {
|
||||
});
|
||||
}
|
||||
|
||||
function getCompanyRateLimiter(companyId) {
|
||||
const key = String(companyId);
|
||||
const existing = companyRateLimiters.get(key);
|
||||
if (existing) return existing;
|
||||
|
||||
const limiter = createTokenBucketRateLimiter({
|
||||
refillPerSecond: CHATTER_REQUESTS_PER_SECOND,
|
||||
capacity: CHATTER_BURST_CAPACITY
|
||||
});
|
||||
|
||||
companyRateLimiters.set(key, limiter);
|
||||
return limiter;
|
||||
}
|
||||
|
||||
function createTokenBucketRateLimiter({ refillPerSecond, capacity }) {
|
||||
let tokens = capacity;
|
||||
let lastRefillAt = Date.now();
|
||||
let pauseUntil = 0;
|
||||
let chain = Promise.resolve();
|
||||
|
||||
const refill = () => {
|
||||
const now = Date.now();
|
||||
const elapsedSec = (now - lastRefillAt) / 1000;
|
||||
if (elapsedSec <= 0) return;
|
||||
tokens = Math.min(capacity, tokens + elapsedSec * refillPerSecond);
|
||||
lastRefillAt = now;
|
||||
};
|
||||
|
||||
const waitForPermit = async () => {
|
||||
for (;;) {
|
||||
const now = Date.now();
|
||||
if (pauseUntil > now) {
|
||||
await sleep(pauseUntil - now);
|
||||
continue;
|
||||
}
|
||||
|
||||
refill();
|
||||
if (tokens >= 1) {
|
||||
tokens -= 1;
|
||||
return;
|
||||
}
|
||||
|
||||
const missing = 1 - tokens;
|
||||
const waitMs = Math.max(25, Math.ceil((missing / refillPerSecond) * 1000));
|
||||
await sleep(waitMs);
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
acquire() {
|
||||
chain = chain.then(waitForPermit, waitForPermit);
|
||||
return chain;
|
||||
},
|
||||
pause(ms) {
|
||||
const n = Number(ms);
|
||||
if (!Number.isFinite(n) || n <= 0) return;
|
||||
pauseUntil = Math.max(pauseUntil, Date.now() + n);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function getPositiveNumber(value, fallback) {
|
||||
const n = Number(value);
|
||||
return Number.isFinite(n) && n > 0 ? n : fallback;
|
||||
}
|
||||
|
||||
function getPositiveInteger(value, fallback) {
|
||||
const n = Number(value);
|
||||
return Number.isInteger(n) && n > 0 ? n : fallback;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a per-company Chatter API client, caching both the token and the client.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user