556 lines
17 KiB
JavaScript
556 lines
17 KiB
JavaScript
/**
|
|
* Environment variables used by this file
|
|
* Chatter integration
|
|
* - CHATTER_API_CONCURRENCY
|
|
* - Maximum number of jobs/interactions posted concurrently *per shop* (within a single shop's batch).
|
|
* - Default: 5
|
|
* - Used by: createConcurrencyLimit(MAX_CONCURRENCY)
|
|
*
|
|
* - CHATTER_API_REQUESTS_PER_SECOND
|
|
* - Per-company outbound request rate (token bucket refill rate).
|
|
* - Default: 3
|
|
* - Must be a positive number; otherwise falls back to default.
|
|
* - Used by: createTokenBucketRateLimiter({ refillPerSecond })
|
|
*
|
|
* - CHATTER_API_BURST_CAPACITY
|
|
* - Per-company token bucket capacity (maximum burst size).
|
|
* - Default: equals CHATTER_API_REQUESTS_PER_SECOND (i.e., 3 unless overridden)
|
|
* - Must be a positive number; otherwise falls back to default.
|
|
* - Used by: createTokenBucketRateLimiter({ capacity })
|
|
*
|
|
* - CHATTER_API_MAX_RETRIES
|
|
* - Maximum number of attempts for posting an interaction before giving up.
|
|
* - Default: 6
|
|
* - Must be a positive integer; otherwise falls back to default.
|
|
* - Used by: postInteractionWithPolicy()
|
|
*
|
|
* - CHATTER_API_TOKEN
|
|
* - Optional override token for emergency/dev scenarios.
|
|
* - If set, bypasses Secrets Manager/Redis token retrieval and uses this value for all companies.
|
|
* - Used by: getChatterApiTokenCached()
|
|
*
|
|
* Notes
|
|
* - Per-company API tokens are otherwise fetched via getChatterApiToken(companyId) (Secrets Manager)
|
|
* and may be cached via `sessionUtils.getChatterToken/setChatterToken` (Redis-backed).
|
|
*/
|
|
|
|
const queries = require("../graphql-client/queries");
|
|
const moment = require("moment-timezone");
|
|
const logger = require("../utils/logger");
|
|
const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client");
|
|
|
|
const client = require("../graphql-client/graphql-client").client;
|
|
|
|
const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION";
|
|
const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5);
|
|
const CHATTER_REQUESTS_PER_SECOND = getPositiveNumber(process.env.CHATTER_API_REQUESTS_PER_SECOND, 3);
|
|
const CHATTER_BURST_CAPACITY = getPositiveNumber(process.env.CHATTER_API_BURST_CAPACITY, CHATTER_REQUESTS_PER_SECOND);
|
|
const CHATTER_MAX_RETRIES = getPositiveInteger(process.env.CHATTER_API_MAX_RETRIES, 6);
|
|
|
|
// Client caching (in-memory) - tokens are now cached in Redis
|
|
const clientCache = new Map(); // companyId -> ChatterApiClient
|
|
const tokenInFlight = new Map(); // companyId -> Promise<string> (for in-flight deduplication)
|
|
const companyRateLimiters = new Map(); // companyId -> rate limiter
|
|
|
|
/**
|
|
* Core processing function for Chatter API jobs.
|
|
* This can be called by the HTTP handler or the BullMQ worker.
|
|
*
|
|
* @param {Object} options - Processing options
|
|
* @param {string} options.start - Start date for the delivery window
|
|
* @param {string} options.end - End date for the delivery window
|
|
* @param {Array<string>} options.bodyshopIds - Optional specific shops to process
|
|
* @param {boolean} options.skipUpload - Dry-run flag
|
|
* @param {Object} options.sessionUtils - Optional session utils for token caching
|
|
* @returns {Promise<Object>} Result with totals, allShopSummaries, and allErrors
|
|
*/
|
|
async function processChatterApiJob({ start, end, bodyshopIds, skipUpload, sessionUtils }) {
|
|
logger.log("chatter-api-start", "DEBUG", "api", null, null);
|
|
|
|
const allErrors = [];
|
|
const allShopSummaries = [];
|
|
|
|
// Shops that DO have chatter_company_id
|
|
const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY);
|
|
|
|
const shopsToProcess =
|
|
bodyshopIds?.length > 0 ? bodyshops.filter((shop) => bodyshopIds.includes(shop.id)) : bodyshops;
|
|
|
|
logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length });
|
|
|
|
if (shopsToProcess.length === 0) {
|
|
logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null);
|
|
return {
|
|
totals: { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 },
|
|
allShopSummaries: [],
|
|
allErrors: []
|
|
};
|
|
}
|
|
|
|
await processBatchApi({
|
|
shopsToProcess,
|
|
start,
|
|
end,
|
|
skipUpload,
|
|
allShopSummaries,
|
|
allErrors,
|
|
sessionUtils
|
|
});
|
|
|
|
const totals = allShopSummaries.reduce(
|
|
(acc, s) => {
|
|
acc.shops += 1;
|
|
acc.jobs += s.jobs || 0;
|
|
acc.sent += s.sent || 0;
|
|
acc.duplicates += s.duplicates || 0;
|
|
acc.failed += s.failed || 0;
|
|
return acc;
|
|
},
|
|
{ shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }
|
|
);
|
|
|
|
logger.log("chatter-api-end", "DEBUG", "api", null, totals);
|
|
|
|
return { totals, allShopSummaries, allErrors };
|
|
}
|
|
|
|
exports.default = async (req, res) => {
|
|
if (process.env.NODE_ENV !== "production") return res.sendStatus(403);
|
|
if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401);
|
|
|
|
res.status(202).json({
|
|
success: true,
|
|
message: "Chatter API job queued for processing",
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
try {
|
|
const { dispatchChatterApiJob } = require("./queues/chatterApiQueue");
|
|
const { start, end, bodyshopIds, skipUpload } = req.body;
|
|
|
|
await dispatchChatterApiJob({
|
|
start,
|
|
end,
|
|
bodyshopIds,
|
|
skipUpload
|
|
});
|
|
} catch (error) {
|
|
logger.log("chatter-api-queue-dispatch-error", "ERROR", "api", null, {
|
|
error: error.message,
|
|
stack: error.stack
|
|
});
|
|
}
|
|
};
|
|
|
|
exports.processChatterApiJob = processChatterApiJob;
|
|
|
|
async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) {
|
|
for (const bodyshop of shopsToProcess) {
|
|
const summary = {
|
|
bodyshopid: bodyshop.id,
|
|
imexshopid: bodyshop.imexshopid,
|
|
shopname: bodyshop.shopname,
|
|
chatter_company_id: bodyshop.chatter_company_id,
|
|
chatterid: bodyshop.chatterid,
|
|
jobs: 0,
|
|
sent: 0,
|
|
duplicates: 0,
|
|
failed: 0,
|
|
ok: true
|
|
};
|
|
|
|
try {
|
|
logger.log("chatter-api-start-shop", "DEBUG", "api", bodyshop.id, { shopname: bodyshop.shopname });
|
|
|
|
const companyId = parseCompanyId(bodyshop.chatter_company_id);
|
|
if (!companyId) {
|
|
summary.ok = false;
|
|
summary.failed = 0;
|
|
allErrors.push({
|
|
...pickShop(bodyshop),
|
|
fatal: true,
|
|
errors: [`Invalid chatter_company_id: "${bodyshop.chatter_company_id}"`]
|
|
});
|
|
allShopSummaries.push(summary);
|
|
continue;
|
|
}
|
|
|
|
const chatterApi = await getChatterApiClient(companyId, sessionUtils);
|
|
|
|
const { jobs } = await client.request(queries.CHATTER_QUERY, {
|
|
bodyshopid: bodyshop.id,
|
|
start: start ? moment(start).startOf("day") : moment().subtract(1, "days").startOf("day"),
|
|
...(end && { end: moment(end).endOf("day") })
|
|
});
|
|
|
|
summary.jobs = jobs.length;
|
|
|
|
// concurrency-limited posting
|
|
const limit = createConcurrencyLimit(MAX_CONCURRENCY);
|
|
const results = await Promise.all(
|
|
jobs.map((j) =>
|
|
limit(async () => {
|
|
const payload = buildInteractionPayload(bodyshop, j);
|
|
|
|
// keep legacy flag name: skipUpload == dry-run
|
|
if (skipUpload) return { ok: true, dryRun: true };
|
|
|
|
const r = await postInteractionWithPolicy(chatterApi, companyId, payload);
|
|
return r;
|
|
})
|
|
)
|
|
);
|
|
|
|
for (const r of results) {
|
|
if (r?.dryRun) continue;
|
|
if (r?.ok && r?.duplicate) summary.duplicates += 1;
|
|
else if (r?.ok) summary.sent += 1;
|
|
else summary.failed += 1;
|
|
}
|
|
|
|
// record failures with some detail (cap to avoid huge emails)
|
|
const failures = results
|
|
.filter((r) => r && r.ok === false)
|
|
.slice(0, 25)
|
|
.map((r) => ({
|
|
status: r.status,
|
|
error: r.error,
|
|
context: r.context
|
|
}));
|
|
|
|
if (failures.length) {
|
|
summary.ok = false;
|
|
allErrors.push({
|
|
...pickShop(bodyshop),
|
|
fatal: false,
|
|
errors: failures
|
|
});
|
|
}
|
|
|
|
logger.log("chatter-api-end-shop", "DEBUG", "api", bodyshop.id, summary);
|
|
} catch (error) {
|
|
summary.ok = false;
|
|
|
|
logger.log("chatter-api-error-shop", "ERROR", "api", bodyshop.id, {
|
|
error: error.message,
|
|
stack: error.stack
|
|
});
|
|
|
|
allErrors.push({
|
|
...pickShop(bodyshop),
|
|
fatal: true,
|
|
errors: [error.toString()]
|
|
});
|
|
} finally {
|
|
allShopSummaries.push(summary);
|
|
}
|
|
}
|
|
}
|
|
|
|
function buildInteractionPayload(bodyshop, j) {
|
|
const isCompany = Boolean(j.ownr_co_nm);
|
|
|
|
const locationIdentifier = bodyshop?.imexshopid ?? `${bodyshop.chatter_company_id}-${bodyshop.id}`;
|
|
|
|
const timestamp = formatChatterTimestamp(j.actual_delivery, bodyshop.timezone);
|
|
|
|
if (j.actual_delivery && !timestamp) {
|
|
logger.log("chatter-api-invalid-delivery-timestamp", "WARN", "api", bodyshop.id, {
|
|
bodyshopId: bodyshop.id,
|
|
jobId: j.id,
|
|
timezone: bodyshop.timezone,
|
|
actualDelivery: j.actual_delivery
|
|
});
|
|
}
|
|
|
|
return {
|
|
locationIdentifier: locationIdentifier,
|
|
event: CHATTER_EVENT,
|
|
consent: "true",
|
|
transactionId: j.ro_number != null ? String(j.ro_number) : undefined,
|
|
timestamp,
|
|
firstName: isCompany ? null : j.ownr_fn || null,
|
|
lastName: isCompany ? j.ownr_co_nm : j.ownr_ln || null,
|
|
emailAddress: j.ownr_ea || undefined,
|
|
phoneNumber: j.ownr_ph1 || undefined,
|
|
metadata: {
|
|
imexShopId: bodyshop.imexshopid,
|
|
bodyshopId: bodyshop.id,
|
|
jobId: j.id
|
|
}
|
|
};
|
|
}
|
|
|
|
async function postInteractionWithPolicy(chatterApi, companyId, payload) {
|
|
const limiter = getCompanyRateLimiter(companyId);
|
|
const requestContext = {
|
|
companyId,
|
|
locationIdentifier: payload?.locationIdentifier,
|
|
transactionId: payload?.transactionId,
|
|
timestamp: payload?.timestamp ?? null,
|
|
bodyshopId: payload?.metadata?.bodyshopId ?? null,
|
|
jobId: payload?.metadata?.jobId ?? null
|
|
};
|
|
|
|
for (let attempt = 0; attempt < CHATTER_MAX_RETRIES; attempt++) {
|
|
await limiter.acquire();
|
|
|
|
try {
|
|
await chatterApi.postInteraction(companyId, payload);
|
|
return { ok: true };
|
|
} catch (e) {
|
|
// duplicate -> treat as successful idempotency outcome
|
|
if (e.status === 409) return { ok: true, duplicate: true, error: e.data };
|
|
|
|
// rate limited -> backoff + retry
|
|
if (e.status === 429) {
|
|
const retryDelayMs = retryDelayMsForError(e, attempt);
|
|
limiter.pause(retryDelayMs);
|
|
logger.log("chatter-api-request-rate-limited", "WARN", "api", requestContext.bodyshopId, {
|
|
...requestContext,
|
|
attempt: attempt + 1,
|
|
maxAttempts: CHATTER_MAX_RETRIES,
|
|
status: e.status,
|
|
retryAfterMs: e.retryAfterMs,
|
|
retryDelayMs,
|
|
error: e.data ?? e.message
|
|
});
|
|
await sleep(retryDelayMs);
|
|
continue;
|
|
}
|
|
|
|
logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, {
|
|
...requestContext,
|
|
attempt: attempt + 1,
|
|
maxAttempts: CHATTER_MAX_RETRIES,
|
|
status: e.status,
|
|
error: e.data ?? e.message
|
|
});
|
|
return { ok: false, status: e.status, error: e.data ?? e.message, context: requestContext };
|
|
}
|
|
}
|
|
|
|
logger.log("chatter-api-request-failed", "ERROR", "api", requestContext.bodyshopId, {
|
|
...requestContext,
|
|
maxAttempts: CHATTER_MAX_RETRIES,
|
|
status: 429,
|
|
error: "rate limit retry exhausted"
|
|
});
|
|
|
|
return { ok: false, status: 429, error: "rate limit retry exhausted", context: requestContext };
|
|
}
|
|
|
|
function parseCompanyId(val) {
|
|
const s = String(val ?? "").trim();
|
|
if (!s) return null;
|
|
const n = Number(s);
|
|
if (!Number.isFinite(n) || !Number.isInteger(n) || n <= 0) return null;
|
|
return n;
|
|
}
|
|
|
|
function pickShop(bodyshop) {
|
|
return {
|
|
bodyshopid: bodyshop.id,
|
|
imexshopid: bodyshop.imexshopid,
|
|
shopname: bodyshop.shopname,
|
|
chatter_company_id: bodyshop.chatter_company_id,
|
|
chatterid: bodyshop.chatterid
|
|
};
|
|
}
|
|
|
|
function sleep(ms) {
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|
|
|
|
function backoffMs(attempt) {
|
|
const base = Math.min(30_000, 500 * 2 ** attempt);
|
|
const jitter = Math.floor(Math.random() * 250);
|
|
return base + jitter;
|
|
}
|
|
|
|
function retryDelayMsForError(error, attempt) {
|
|
const retryAfterMs = Number(error?.retryAfterMs);
|
|
if (Number.isFinite(retryAfterMs) && retryAfterMs > 0) {
|
|
const jitter = Math.floor(Math.random() * 250);
|
|
return Math.min(60_000, retryAfterMs + jitter);
|
|
}
|
|
return backoffMs(attempt);
|
|
}
|
|
|
|
function formatChatterTimestamp(value, timezone) {
|
|
if (!value) return undefined;
|
|
|
|
const hasValidTimezone = Boolean(timezone && moment.tz.zone(timezone));
|
|
const parsed = hasValidTimezone ? moment(value).tz(timezone) : moment(value);
|
|
if (!parsed.isValid()) return undefined;
|
|
|
|
// Keep a strict, Chatter-friendly timestamp without fractional seconds.
|
|
return parsed.utc().format("YYYY-MM-DD HH:mm:ss[Z]");
|
|
}
|
|
|
|
function createConcurrencyLimit(max) {
|
|
let active = 0;
|
|
const queue = [];
|
|
|
|
const next = () => {
|
|
if (active >= max) return;
|
|
const fn = queue.shift();
|
|
if (!fn) return;
|
|
active++;
|
|
fn()
|
|
.catch(() => {})
|
|
.finally(() => {
|
|
active--;
|
|
next();
|
|
});
|
|
};
|
|
|
|
return (fn) =>
|
|
new Promise((resolve, reject) => {
|
|
queue.push(async () => {
|
|
try {
|
|
resolve(await fn());
|
|
} catch (e) {
|
|
reject(e);
|
|
}
|
|
});
|
|
next();
|
|
});
|
|
}
|
|
|
|
function getCompanyRateLimiter(companyId) {
|
|
const key = String(companyId);
|
|
const existing = companyRateLimiters.get(key);
|
|
if (existing) return existing;
|
|
|
|
const limiter = createTokenBucketRateLimiter({
|
|
refillPerSecond: CHATTER_REQUESTS_PER_SECOND,
|
|
capacity: CHATTER_BURST_CAPACITY
|
|
});
|
|
|
|
companyRateLimiters.set(key, limiter);
|
|
return limiter;
|
|
}
|
|
|
|
function createTokenBucketRateLimiter({ refillPerSecond, capacity }) {
|
|
let tokens = capacity;
|
|
let lastRefillAt = Date.now();
|
|
let pauseUntil = 0;
|
|
let chain = Promise.resolve();
|
|
|
|
const refill = () => {
|
|
const now = Date.now();
|
|
const elapsedSec = (now - lastRefillAt) / 1000;
|
|
if (elapsedSec <= 0) return;
|
|
tokens = Math.min(capacity, tokens + elapsedSec * refillPerSecond);
|
|
lastRefillAt = now;
|
|
};
|
|
|
|
const waitForPermit = async () => {
|
|
for (;;) {
|
|
const now = Date.now();
|
|
if (pauseUntil > now) {
|
|
await sleep(pauseUntil - now);
|
|
continue;
|
|
}
|
|
|
|
refill();
|
|
if (tokens >= 1) {
|
|
tokens -= 1;
|
|
return;
|
|
}
|
|
|
|
const missing = 1 - tokens;
|
|
const waitMs = Math.max(25, Math.ceil((missing / refillPerSecond) * 1000));
|
|
await sleep(waitMs);
|
|
}
|
|
};
|
|
|
|
return {
|
|
acquire() {
|
|
chain = chain.then(waitForPermit, waitForPermit);
|
|
return chain;
|
|
},
|
|
pause(ms) {
|
|
const n = Number(ms);
|
|
if (!Number.isFinite(n) || n <= 0) return;
|
|
pauseUntil = Math.max(pauseUntil, Date.now() + n);
|
|
}
|
|
};
|
|
}
|
|
|
|
function getPositiveNumber(value, fallback) {
|
|
const n = Number(value);
|
|
return Number.isFinite(n) && n > 0 ? n : fallback;
|
|
}
|
|
|
|
function getPositiveInteger(value, fallback) {
|
|
const n = Number(value);
|
|
return Number.isInteger(n) && n > 0 ? n : fallback;
|
|
}
|
|
|
|
/**
|
|
* Returns a per-company Chatter API client, caching both the token and the client.
|
|
*/
|
|
async function getChatterApiClient(companyId, sessionUtils) {
|
|
const key = String(companyId);
|
|
|
|
const existing = clientCache.get(key);
|
|
if (existing) return existing;
|
|
|
|
const apiToken = await getChatterApiTokenCached(companyId, sessionUtils);
|
|
const chatterApi = new ChatterApiClient({ baseUrl: CHATTER_BASE_URL, apiToken });
|
|
|
|
clientCache.set(key, chatterApi);
|
|
return chatterApi;
|
|
}
|
|
|
|
/**
|
|
* Fetches the per-company token from AWS Secrets Manager with Redis caching
|
|
* SecretId: CHATTER_COMPANY_KEY_<companyId>
|
|
*
|
|
* Uses Redis caching + in-flight dedupe to avoid hammering Secrets Manager.
|
|
*/
|
|
async function getChatterApiTokenCached(companyId, sessionUtils) {
|
|
const key = String(companyId ?? "").trim();
|
|
if (!key) throw new Error("getChatterApiToken: companyId is required");
|
|
|
|
// Optional override for emergency/dev
|
|
if (process.env.CHATTER_API_TOKEN) return process.env.CHATTER_API_TOKEN;
|
|
|
|
// Check Redis cache if sessionUtils is available
|
|
if (sessionUtils?.getChatterToken) {
|
|
const cachedToken = await sessionUtils.getChatterToken(key);
|
|
if (cachedToken) {
|
|
logger.log("chatter-api-get-token-cache-hit", "DEBUG", "api", null, { companyId: key });
|
|
return cachedToken;
|
|
}
|
|
}
|
|
|
|
// Check for in-flight requests
|
|
const inflight = tokenInFlight.get(key);
|
|
if (inflight) return inflight;
|
|
|
|
const p = (async () => {
|
|
logger.log("chatter-api-get-token-cache-miss", "DEBUG", "api", null, { companyId: key });
|
|
|
|
// Fetch token from Secrets Manager using shared function
|
|
const token = await getChatterApiToken(companyId);
|
|
|
|
// Store in Redis cache if sessionUtils is available
|
|
if (sessionUtils?.setChatterToken) {
|
|
await sessionUtils.setChatterToken(key, token);
|
|
}
|
|
|
|
return token;
|
|
})();
|
|
|
|
tokenInFlight.set(key, p);
|
|
|
|
try {
|
|
return await p;
|
|
} finally {
|
|
tokenInFlight.delete(key);
|
|
}
|
|
}
|