6feature/IO-3556-Chattr-Integration - Move to BULLMQ stack
This commit is contained in:
15
server.js
15
server.js
@@ -40,6 +40,8 @@ const { loadEmailQueue } = require("./server/notifications/queues/emailQueue");
|
|||||||
const { loadAppQueue } = require("./server/notifications/queues/appQueue");
|
const { loadAppQueue } = require("./server/notifications/queues/appQueue");
|
||||||
const { SetLegacyWebsocketHandlers } = require("./server/web-sockets/web-socket");
|
const { SetLegacyWebsocketHandlers } = require("./server/web-sockets/web-socket");
|
||||||
const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue");
|
const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue");
|
||||||
|
const { loadChatterApiQueue } = require("./server/data/queues/chatterApiQueue");
|
||||||
|
const { processChatterApiJob } = require("./server/data/chatter-api");
|
||||||
|
|
||||||
const CLUSTER_RETRY_BASE_DELAY = 100;
|
const CLUSTER_RETRY_BASE_DELAY = 100;
|
||||||
const CLUSTER_RETRY_MAX_DELAY = 5000;
|
const CLUSTER_RETRY_MAX_DELAY = 5000;
|
||||||
@@ -391,6 +393,15 @@ const applySocketIO = async ({ server, app }) => {
|
|||||||
const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
||||||
const queueSettings = { pubClient, logger, redisHelpers, ioRedis };
|
const queueSettings = { pubClient, logger, redisHelpers, ioRedis };
|
||||||
|
|
||||||
|
// Load chatterApi queue with processJob function and redis helpers
|
||||||
|
const chatterApiQueue = await loadChatterApiQueue({
|
||||||
|
pubClient,
|
||||||
|
logger,
|
||||||
|
processJob: processChatterApiJob,
|
||||||
|
getChatterToken: redisHelpers.getChatterToken,
|
||||||
|
setChatterToken: redisHelpers.setChatterToken
|
||||||
|
});
|
||||||
|
|
||||||
// Assuming loadEmailQueue and loadAppQueue return Promises
|
// Assuming loadEmailQueue and loadAppQueue return Promises
|
||||||
const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([
|
const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([
|
||||||
loadEmailQueue(queueSettings),
|
loadEmailQueue(queueSettings),
|
||||||
@@ -410,6 +421,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => {
|
|||||||
notificationsFcmQueue.on("error", (error) => {
|
notificationsFcmQueue.on("error", (error) => {
|
||||||
logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message });
|
logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
chatterApiQueue.on("error", (error) => {
|
||||||
|
logger.log(`Error in chatterApiQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message });
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -40,7 +40,6 @@ const logger = require("../utils/logger");
|
|||||||
const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client");
|
const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client");
|
||||||
|
|
||||||
const client = require("../graphql-client/graphql-client").client;
|
const client = require("../graphql-client/graphql-client").client;
|
||||||
const { sendServerEmail } = require("../email/sendemail");
|
|
||||||
|
|
||||||
const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION";
|
const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION";
|
||||||
const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5);
|
const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5);
|
||||||
@@ -53,74 +52,98 @@ const clientCache = new Map(); // companyId -> ChatterApiClient
|
|||||||
const tokenInFlight = new Map(); // companyId -> Promise<string> (for in-flight deduplication)
|
const tokenInFlight = new Map(); // companyId -> Promise<string> (for in-flight deduplication)
|
||||||
const companyRateLimiters = new Map(); // companyId -> rate limiter
|
const companyRateLimiters = new Map(); // companyId -> rate limiter
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Core processing function for Chatter API jobs.
|
||||||
|
* This can be called by the HTTP handler or the BullMQ worker.
|
||||||
|
*
|
||||||
|
* @param {Object} options - Processing options
|
||||||
|
* @param {string} options.start - Start date for the delivery window
|
||||||
|
* @param {string} options.end - End date for the delivery window
|
||||||
|
* @param {Array<string>} options.bodyshopIds - Optional specific shops to process
|
||||||
|
* @param {boolean} options.skipUpload - Dry-run flag
|
||||||
|
* @param {Object} options.sessionUtils - Optional session utils for token caching
|
||||||
|
* @returns {Promise<Object>} Result with totals, allShopSummaries, and allErrors
|
||||||
|
*/
|
||||||
|
async function processChatterApiJob({ start, end, bodyshopIds, skipUpload, sessionUtils }) {
|
||||||
|
logger.log("chatter-api-start", "DEBUG", "api", null, null);
|
||||||
|
|
||||||
|
const allErrors = [];
|
||||||
|
const allShopSummaries = [];
|
||||||
|
|
||||||
|
// Shops that DO have chatter_company_id
|
||||||
|
const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY);
|
||||||
|
|
||||||
|
const shopsToProcess =
|
||||||
|
bodyshopIds?.length > 0 ? bodyshops.filter((shop) => bodyshopIds.includes(shop.id)) : bodyshops;
|
||||||
|
|
||||||
|
logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length });
|
||||||
|
|
||||||
|
if (shopsToProcess.length === 0) {
|
||||||
|
logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null);
|
||||||
|
return {
|
||||||
|
totals: { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 },
|
||||||
|
allShopSummaries: [],
|
||||||
|
allErrors: []
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
await processBatchApi({
|
||||||
|
shopsToProcess,
|
||||||
|
start,
|
||||||
|
end,
|
||||||
|
skipUpload,
|
||||||
|
allShopSummaries,
|
||||||
|
allErrors,
|
||||||
|
sessionUtils
|
||||||
|
});
|
||||||
|
|
||||||
|
const totals = allShopSummaries.reduce(
|
||||||
|
(acc, s) => {
|
||||||
|
acc.shops += 1;
|
||||||
|
acc.jobs += s.jobs || 0;
|
||||||
|
acc.sent += s.sent || 0;
|
||||||
|
acc.duplicates += s.duplicates || 0;
|
||||||
|
acc.failed += s.failed || 0;
|
||||||
|
return acc;
|
||||||
|
},
|
||||||
|
{ shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.log("chatter-api-end", "DEBUG", "api", null, totals);
|
||||||
|
|
||||||
|
return { totals, allShopSummaries, allErrors };
|
||||||
|
}
|
||||||
|
|
||||||
exports.default = async (req, res) => {
|
exports.default = async (req, res) => {
|
||||||
if (process.env.NODE_ENV !== "production") return res.sendStatus(403);
|
if (process.env.NODE_ENV !== "production") return res.sendStatus(403);
|
||||||
if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401);
|
if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401);
|
||||||
|
|
||||||
res.status(202).json({
|
res.status(202).json({
|
||||||
success: true,
|
success: true,
|
||||||
message: "Processing Chatter-API Cron request ...",
|
message: "Chatter API job queued for processing",
|
||||||
timestamp: new Date().toISOString()
|
timestamp: new Date().toISOString()
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.log("chatter-api-start", "DEBUG", "api", null, null);
|
const { dispatchChatterApiJob } = require("./queues/chatterApiQueue");
|
||||||
|
const { start, end, bodyshopIds, skipUpload } = req.body;
|
||||||
|
|
||||||
const allErrors = [];
|
await dispatchChatterApiJob({
|
||||||
const allShopSummaries = [];
|
|
||||||
|
|
||||||
// Shops that DO have chatter_company_id
|
|
||||||
const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY);
|
|
||||||
|
|
||||||
const specificShopIds = req.body.bodyshopIds;
|
|
||||||
const { start, end, skipUpload } = req.body; // keep same flag; now acts like "dry run"
|
|
||||||
|
|
||||||
const shopsToProcess =
|
|
||||||
specificShopIds?.length > 0 ? bodyshops.filter((shop) => specificShopIds.includes(shop.id)) : bodyshops;
|
|
||||||
|
|
||||||
logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length });
|
|
||||||
|
|
||||||
if (shopsToProcess.length === 0) {
|
|
||||||
logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await processBatchApi({
|
|
||||||
shopsToProcess,
|
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
skipUpload,
|
bodyshopIds,
|
||||||
allShopSummaries,
|
skipUpload
|
||||||
allErrors,
|
|
||||||
sessionUtils: req.sessionUtils
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const totals = allShopSummaries.reduce(
|
|
||||||
(acc, s) => {
|
|
||||||
acc.shops += 1;
|
|
||||||
acc.jobs += s.jobs || 0;
|
|
||||||
acc.sent += s.sent || 0;
|
|
||||||
acc.duplicates += s.duplicates || 0;
|
|
||||||
acc.failed += s.failed || 0;
|
|
||||||
return acc;
|
|
||||||
},
|
|
||||||
{ shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }
|
|
||||||
);
|
|
||||||
|
|
||||||
await sendServerEmail({
|
|
||||||
subject: `Chatter API Report ${moment().format("MM-DD-YY")}`,
|
|
||||||
text:
|
|
||||||
`Totals:\n${JSON.stringify(totals, null, 2)}\n\n` +
|
|
||||||
`Shop summaries:\n${JSON.stringify(allShopSummaries, null, 2)}\n\n` +
|
|
||||||
`Errors:\n${JSON.stringify(allErrors, null, 2)}\n`
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.log("chatter-api-end", "DEBUG", "api", null, totals);
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.log("chatter-api-error", "ERROR", "api", null, { error: error.message, stack: error.stack });
|
logger.log("chatter-api-queue-dispatch-error", "ERROR", "api", null, {
|
||||||
|
error: error.message,
|
||||||
|
stack: error.stack
|
||||||
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
exports.processChatterApiJob = processChatterApiJob;
|
||||||
|
|
||||||
async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) {
|
async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) {
|
||||||
for (const bodyshop of shopsToProcess) {
|
for (const bodyshop of shopsToProcess) {
|
||||||
const summary = {
|
const summary = {
|
||||||
|
|||||||
178
server/data/queues/chatterApiQueue.js
Normal file
178
server/data/queues/chatterApiQueue.js
Normal file
@@ -0,0 +1,178 @@
|
|||||||
|
const { Queue, Worker } = require("bullmq");
|
||||||
|
const { registerCleanupTask } = require("../../utils/cleanupManager");
|
||||||
|
const getBullMQPrefix = require("../../utils/getBullMQPrefix");
|
||||||
|
const devDebugLogger = require("../../utils/devDebugLogger");
|
||||||
|
const moment = require("moment-timezone");
|
||||||
|
const { sendServerEmail } = require("../../email/sendemail");
|
||||||
|
|
||||||
|
let chatterApiQueue;
|
||||||
|
let chatterApiWorker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the Chatter API queue and worker.
|
||||||
|
*
|
||||||
|
* @param {Object} options - Configuration options for queue initialization.
|
||||||
|
* @param {Object} options.pubClient - Redis client instance for queue communication.
|
||||||
|
* @param {Object} options.logger - Logger instance for logging events and debugging.
|
||||||
|
* @param {Function} options.processJob - Function to process the Chatter API job.
|
||||||
|
* @param {Function} options.getChatterToken - Function to get Chatter token from Redis.
|
||||||
|
* @param {Function} options.setChatterToken - Function to set Chatter token in Redis.
|
||||||
|
* @returns {Queue} The initialized `chatterApiQueue` instance.
|
||||||
|
*/
|
||||||
|
const loadChatterApiQueue = async ({ pubClient, logger, processJob, getChatterToken, setChatterToken }) => {
|
||||||
|
if (!chatterApiQueue) {
|
||||||
|
const prefix = getBullMQPrefix();
|
||||||
|
|
||||||
|
devDebugLogger(`Initializing Chatter API Queue with prefix: ${prefix}`);
|
||||||
|
|
||||||
|
chatterApiQueue = new Queue("chatterApi", {
|
||||||
|
prefix,
|
||||||
|
connection: pubClient,
|
||||||
|
defaultJobOptions: {
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: false,
|
||||||
|
attempts: 3,
|
||||||
|
backoff: {
|
||||||
|
type: "exponential",
|
||||||
|
delay: 60000 // 1 minute base delay
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
chatterApiWorker = new Worker(
|
||||||
|
"chatterApi",
|
||||||
|
async (job) => {
|
||||||
|
const { start, end, bodyshopIds, skipUpload } = job.data;
|
||||||
|
|
||||||
|
logger.log("chatter-api-queue-job-start", "INFO", "api", null, {
|
||||||
|
jobId: job.id,
|
||||||
|
start,
|
||||||
|
end,
|
||||||
|
bodyshopIds,
|
||||||
|
skipUpload
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Provide sessionUtils-like object with token caching functions
|
||||||
|
const sessionUtils = {
|
||||||
|
getChatterToken,
|
||||||
|
setChatterToken
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await processJob({
|
||||||
|
start,
|
||||||
|
end,
|
||||||
|
bodyshopIds,
|
||||||
|
skipUpload,
|
||||||
|
sessionUtils
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.log("chatter-api-queue-job-complete", "INFO", "api", null, {
|
||||||
|
jobId: job.id,
|
||||||
|
totals: result.totals
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send email summary
|
||||||
|
await sendServerEmail({
|
||||||
|
subject: `Chatter API Report ${moment().format("MM-DD-YY")}`,
|
||||||
|
text:
|
||||||
|
`Totals:\n${JSON.stringify(result.totals, null, 2)}\n\n` +
|
||||||
|
`Shop summaries:\n${JSON.stringify(result.allShopSummaries, null, 2)}\n\n` +
|
||||||
|
`Errors:\n${JSON.stringify(result.allErrors, null, 2)}\n`
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
} catch (error) {
|
||||||
|
logger.log("chatter-api-queue-job-error", "ERROR", "api", null, {
|
||||||
|
jobId: job.id,
|
||||||
|
error: error.message,
|
||||||
|
stack: error.stack
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send error email
|
||||||
|
await sendServerEmail({
|
||||||
|
subject: `Chatter API Error ${moment().format("MM-DD-YY")}`,
|
||||||
|
text: `Job failed:\n${error.message}\n\n${error.stack}`
|
||||||
|
});
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix,
|
||||||
|
connection: pubClient,
|
||||||
|
concurrency: 1, // Process one job at a time
|
||||||
|
lockDuration: 14400000 // 4 hours - allow long-running jobs
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Event handlers
|
||||||
|
chatterApiWorker.on("completed", (job) => {
|
||||||
|
devDebugLogger(`Chatter API job ${job.id} completed`);
|
||||||
|
});
|
||||||
|
|
||||||
|
chatterApiWorker.on("failed", (job, err) => {
|
||||||
|
logger.log("chatter-api-queue-job-failed", "ERROR", "api", null, {
|
||||||
|
jobId: job?.id,
|
||||||
|
message: err?.message,
|
||||||
|
stack: err?.stack
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
chatterApiWorker.on("progress", (job, progress) => {
|
||||||
|
devDebugLogger(`Chatter API job ${job.id} progress: ${progress}%`);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Register cleanup task
|
||||||
|
const shutdown = async () => {
|
||||||
|
devDebugLogger("Closing Chatter API queue worker...");
|
||||||
|
await chatterApiWorker.close();
|
||||||
|
devDebugLogger("Chatter API queue worker closed");
|
||||||
|
};
|
||||||
|
registerCleanupTask(shutdown);
|
||||||
|
}
|
||||||
|
|
||||||
|
return chatterApiQueue;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves the initialized `chatterApiQueue` instance.
|
||||||
|
*
|
||||||
|
* @returns {Queue} The `chatterApiQueue` instance.
|
||||||
|
* @throws {Error} If `chatterApiQueue` is not initialized.
|
||||||
|
*/
|
||||||
|
const getQueue = () => {
|
||||||
|
if (!chatterApiQueue) {
|
||||||
|
throw new Error("Chatter API queue not initialized. Ensure loadChatterApiQueue is called during bootstrap.");
|
||||||
|
}
|
||||||
|
return chatterApiQueue;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatches a Chatter API job to the queue.
|
||||||
|
*
|
||||||
|
* @param {Object} options - Options for the job.
|
||||||
|
* @param {string} options.start - Start date for the delivery window.
|
||||||
|
* @param {string} options.end - End date for the delivery window.
|
||||||
|
* @param {Array<string>} options.bodyshopIds - Optional specific shops to process.
|
||||||
|
* @param {boolean} options.skipUpload - Dry-run flag.
|
||||||
|
* @returns {Promise<void>} Resolves when the job is added to the queue.
|
||||||
|
*/
|
||||||
|
const dispatchChatterApiJob = async ({ start, end, bodyshopIds, skipUpload }) => {
|
||||||
|
const queue = getQueue();
|
||||||
|
|
||||||
|
const jobData = {
|
||||||
|
start: start || moment().subtract(1, "days").startOf("day").toISOString(),
|
||||||
|
end: end || moment().endOf("day").toISOString(),
|
||||||
|
bodyshopIds: bodyshopIds || [],
|
||||||
|
skipUpload: skipUpload || false
|
||||||
|
};
|
||||||
|
|
||||||
|
await queue.add("process-chatter-api", jobData, {
|
||||||
|
jobId: `chatter-api-${moment().format("YYYY-MM-DD-HHmmss")}`
|
||||||
|
});
|
||||||
|
|
||||||
|
devDebugLogger(`Added Chatter API job to queue: ${JSON.stringify(jobData)}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = { loadChatterApiQueue, getQueue, dispatchChatterApiJob };
|
||||||
Reference in New Issue
Block a user