feature/IO-3556-Chattr-Integration
This commit is contained in:
350
server/data/chatter-api.js
Normal file
350
server/data/chatter-api.js
Normal file
@@ -0,0 +1,350 @@
|
||||
const queries = require("../graphql-client/queries");
|
||||
const moment = require("moment-timezone");
|
||||
const logger = require("../utils/logger");
|
||||
const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client");
|
||||
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
|
||||
const { isString, isEmpty } = require("lodash");
|
||||
|
||||
const client = require("../graphql-client/graphql-client").client;
|
||||
const { sendServerEmail } = require("../email/sendemail");
|
||||
|
||||
const CHATTER_EVENT = process.env.CHATTER_SOLICITATION_EVENT || "delivery";
|
||||
const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5);
|
||||
const AWS_REGION = process.env.AWS_REGION || "ca-central-1";
|
||||
|
||||
// Configure SecretsManager client with localstack support for caching implementation
|
||||
const secretsClientOptions = {
|
||||
region: AWS_REGION,
|
||||
credentials: defaultProvider()
|
||||
};
|
||||
|
||||
const isLocal = isString(process.env?.LOCALSTACK_HOSTNAME) && !isEmpty(process.env?.LOCALSTACK_HOSTNAME);
|
||||
|
||||
if (isLocal) {
|
||||
secretsClientOptions.endpoint = `http://${process.env.LOCALSTACK_HOSTNAME}:4566`;
|
||||
}
|
||||
|
||||
// Token and client caching for performance
|
||||
const tokenCache = new Map(); // companyId -> token string
|
||||
const tokenInFlight = new Map(); // companyId -> Promise<string>
|
||||
const clientCache = new Map(); // companyId -> ChatterApiClient
|
||||
|
||||
exports.default = async (req, res) => {
|
||||
if (process.env.NODE_ENV !== "production") return res.sendStatus(403);
|
||||
if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401);
|
||||
|
||||
res.status(202).json({
|
||||
success: true,
|
||||
message: "Processing request ...",
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
try {
|
||||
logger.log("chatter-api-start", "DEBUG", "api", null, null);
|
||||
|
||||
const allErrors = [];
|
||||
const allShopSummaries = [];
|
||||
|
||||
// Shops that DO have chatter_company_id
|
||||
const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY);
|
||||
|
||||
const specificShopIds = req.body.bodyshopIds;
|
||||
const { start, end, skipUpload } = req.body; // keep same flag; now acts like "dry run"
|
||||
|
||||
const shopsToProcess =
|
||||
specificShopIds?.length > 0 ? bodyshops.filter((shop) => specificShopIds.includes(shop.id)) : bodyshops;
|
||||
|
||||
logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length });
|
||||
|
||||
if (shopsToProcess.length === 0) {
|
||||
logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null);
|
||||
return;
|
||||
}
|
||||
|
||||
await processBatchApi({
|
||||
shopsToProcess,
|
||||
start,
|
||||
end,
|
||||
skipUpload,
|
||||
allShopSummaries,
|
||||
allErrors
|
||||
});
|
||||
|
||||
const totals = allShopSummaries.reduce(
|
||||
(acc, s) => {
|
||||
acc.shops += 1;
|
||||
acc.jobs += s.jobs || 0;
|
||||
acc.sent += s.sent || 0;
|
||||
acc.duplicates += s.duplicates || 0;
|
||||
acc.failed += s.failed || 0;
|
||||
return acc;
|
||||
},
|
||||
{ shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }
|
||||
);
|
||||
|
||||
await sendServerEmail({
|
||||
subject: `Chatter API Report ${moment().format("MM-DD-YY")}`,
|
||||
text:
|
||||
`Totals:\n${JSON.stringify(totals, null, 2)}\n\n` +
|
||||
`Shop summaries:\n${JSON.stringify(allShopSummaries, null, 2)}\n\n` +
|
||||
`Errors:\n${JSON.stringify(allErrors, null, 2)}\n`
|
||||
});
|
||||
|
||||
logger.log("chatter-api-end", "DEBUG", "api", null, totals);
|
||||
} catch (error) {
|
||||
logger.log("chatter-api-error", "ERROR", "api", null, { error: error.message, stack: error.stack });
|
||||
}
|
||||
};
|
||||
|
||||
async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors }) {
|
||||
for (const bodyshop of shopsToProcess) {
|
||||
const summary = {
|
||||
bodyshopid: bodyshop.id,
|
||||
imexshopid: bodyshop.imexshopid,
|
||||
shopname: bodyshop.shopname,
|
||||
chatter_company_id: bodyshop.chatter_company_id,
|
||||
chatterid: bodyshop.chatterid,
|
||||
jobs: 0,
|
||||
sent: 0,
|
||||
duplicates: 0,
|
||||
failed: 0,
|
||||
ok: true
|
||||
};
|
||||
|
||||
try {
|
||||
logger.log("chatter-api-start-shop", "DEBUG", "api", bodyshop.id, { shopname: bodyshop.shopname });
|
||||
|
||||
const companyId = parseCompanyId(bodyshop.chatter_company_id);
|
||||
if (!companyId) {
|
||||
summary.ok = false;
|
||||
summary.failed = 0;
|
||||
allErrors.push({
|
||||
...pickShop(bodyshop),
|
||||
fatal: true,
|
||||
errors: [`Invalid chatter_company_id: "${bodyshop.chatter_company_id}"`]
|
||||
});
|
||||
allShopSummaries.push(summary);
|
||||
continue;
|
||||
}
|
||||
|
||||
const chatterApi = await getChatterApiClient(companyId);
|
||||
|
||||
const { jobs } = await client.request(queries.CHATTER_QUERY, {
|
||||
bodyshopid: bodyshop.id,
|
||||
start: start ? moment(start).startOf("day") : moment().subtract(1, "days").startOf("day"),
|
||||
...(end && { end: moment(end).endOf("day") })
|
||||
});
|
||||
|
||||
summary.jobs = jobs.length;
|
||||
|
||||
// concurrency-limited posting
|
||||
const limit = createConcurrencyLimit(MAX_CONCURRENCY);
|
||||
const results = await Promise.all(
|
||||
jobs.map((j) =>
|
||||
limit(async () => {
|
||||
const payload = buildInteractionPayload(bodyshop, j);
|
||||
|
||||
// keep legacy flag name: skipUpload == dry-run
|
||||
if (skipUpload) return { ok: true, dryRun: true };
|
||||
|
||||
const r = await postInteractionWithPolicy(chatterApi, companyId, payload);
|
||||
return r;
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
for (const r of results) {
|
||||
if (r?.dryRun) continue;
|
||||
if (r?.ok && r?.duplicate) summary.duplicates += 1;
|
||||
else if (r?.ok) summary.sent += 1;
|
||||
else summary.failed += 1;
|
||||
}
|
||||
|
||||
// record failures with some detail (cap to avoid huge emails)
|
||||
const failures = results
|
||||
.filter((r) => r && r.ok === false)
|
||||
.slice(0, 25)
|
||||
.map((r) => ({ status: r.status, error: r.error }));
|
||||
|
||||
if (failures.length) {
|
||||
summary.ok = false;
|
||||
allErrors.push({
|
||||
...pickShop(bodyshop),
|
||||
fatal: false,
|
||||
errors: failures
|
||||
});
|
||||
}
|
||||
|
||||
logger.log("chatter-api-end-shop", "DEBUG", "api", bodyshop.id, summary);
|
||||
} catch (error) {
|
||||
summary.ok = false;
|
||||
|
||||
logger.log("chatter-api-error-shop", "ERROR", "api", bodyshop.id, {
|
||||
error: error.message,
|
||||
stack: error.stack
|
||||
});
|
||||
|
||||
allErrors.push({
|
||||
...pickShop(bodyshop),
|
||||
fatal: true,
|
||||
errors: [error.toString()]
|
||||
});
|
||||
} finally {
|
||||
allShopSummaries.push(summary);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function buildInteractionPayload(bodyshop, j) {
|
||||
const isCompany = Boolean(j.ownr_co_nm);
|
||||
|
||||
const locationIdentifier = `${bodyshop.chatter_company_id}-${bodyshop.id}`;
|
||||
|
||||
return {
|
||||
locationIdentifier: locationIdentifier,
|
||||
event: CHATTER_EVENT,
|
||||
transactionId: j.ro_number != null ? String(j.ro_number) : undefined,
|
||||
timestamp: j.actual_delivery ? moment(j.actual_delivery).tz(bodyshop.timezone).toISOString() : undefined,
|
||||
firstName: isCompany ? null : j.ownr_fn || null,
|
||||
lastName: isCompany ? j.ownr_co_nm : j.ownr_ln || null,
|
||||
emailAddress: j.ownr_ea || undefined,
|
||||
phoneNumber: j.ownr_ph1 || undefined,
|
||||
metadata: {
|
||||
imexShopId: bodyshop.imexshopid,
|
||||
bodyshopId: bodyshop.id,
|
||||
jobId: j.id
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async function postInteractionWithPolicy(chatterApi, companyId, payload) {
|
||||
for (let attempt = 0; attempt < 6; attempt++) {
|
||||
try {
|
||||
await chatterApi.postInteraction(companyId, payload);
|
||||
return { ok: true };
|
||||
} catch (e) {
|
||||
// duplicate -> treat as successful idempotency outcome
|
||||
if (e.status === 409) return { ok: true, duplicate: true, error: e.data };
|
||||
|
||||
// rate limited -> backoff + retry
|
||||
if (e.status === 429) {
|
||||
await sleep(backoffMs(attempt));
|
||||
continue;
|
||||
}
|
||||
|
||||
return { ok: false, status: e.status, error: e.data ?? e.message };
|
||||
}
|
||||
}
|
||||
return { ok: false, status: 429, error: "rate limit retry exhausted" };
|
||||
}
|
||||
|
||||
function parseCompanyId(val) {
|
||||
const s = String(val ?? "").trim();
|
||||
if (!s) return null;
|
||||
const n = Number(s);
|
||||
if (!Number.isFinite(n) || !Number.isInteger(n) || n <= 0) return null;
|
||||
return n;
|
||||
}
|
||||
|
||||
function pickShop(bodyshop) {
|
||||
return {
|
||||
bodyshopid: bodyshop.id,
|
||||
imexshopid: bodyshop.imexshopid,
|
||||
shopname: bodyshop.shopname,
|
||||
chatter_company_id: bodyshop.chatter_company_id,
|
||||
chatterid: bodyshop.chatterid
|
||||
};
|
||||
}
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
function backoffMs(attempt) {
|
||||
const base = Math.min(30_000, 500 * 2 ** attempt);
|
||||
const jitter = Math.floor(Math.random() * 250);
|
||||
return base + jitter;
|
||||
}
|
||||
|
||||
function createConcurrencyLimit(max) {
|
||||
let active = 0;
|
||||
const queue = [];
|
||||
|
||||
const next = () => {
|
||||
if (active >= max) return;
|
||||
const fn = queue.shift();
|
||||
if (!fn) return;
|
||||
active++;
|
||||
fn()
|
||||
.catch(() => {})
|
||||
.finally(() => {
|
||||
active--;
|
||||
next();
|
||||
});
|
||||
};
|
||||
|
||||
return (fn) =>
|
||||
new Promise((resolve, reject) => {
|
||||
queue.push(async () => {
|
||||
try {
|
||||
resolve(await fn());
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
next();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a per-company Chatter API client, caching both the token and the client.
|
||||
*/
|
||||
async function getChatterApiClient(companyId) {
|
||||
const key = String(companyId);
|
||||
|
||||
const existing = clientCache.get(key);
|
||||
if (existing) return existing;
|
||||
|
||||
const apiToken = await getChatterApiTokenCached(companyId);
|
||||
const chatterApi = new ChatterApiClient({ baseUrl: CHATTER_BASE_URL, apiToken });
|
||||
|
||||
clientCache.set(key, chatterApi);
|
||||
return chatterApi;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the per-company token from AWS Secrets Manager with caching
|
||||
* SecretId: CHATTER_COMPANY_KEY_<companyId>
|
||||
*
|
||||
* Uses caching + in-flight dedupe to avoid hammering Secrets Manager.
|
||||
*/
|
||||
async function getChatterApiTokenCached(companyId) {
|
||||
const key = String(companyId ?? "").trim();
|
||||
if (!key) throw new Error("getChatterApiToken: companyId is required");
|
||||
|
||||
// Optional override for emergency/dev
|
||||
if (process.env.CHATTER_API_TOKEN) return process.env.CHATTER_API_TOKEN;
|
||||
|
||||
const cached = tokenCache.get(key);
|
||||
if (cached) return cached;
|
||||
|
||||
const inflight = tokenInFlight.get(key);
|
||||
if (inflight) return inflight;
|
||||
|
||||
const p = (async () => {
|
||||
logger.log("chatter-api-get-token", "DEBUG", "api", null, { companyId: key });
|
||||
|
||||
// Use the shared function from chatter-client
|
||||
const token = await getChatterApiToken(companyId);
|
||||
tokenCache.set(key, token);
|
||||
return token;
|
||||
})();
|
||||
|
||||
tokenInFlight.set(key, p);
|
||||
|
||||
try {
|
||||
return await p;
|
||||
} finally {
|
||||
tokenInFlight.delete(key);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user