diff --git a/server.js b/server.js index 07901ab6e..33cbe014c 100644 --- a/server.js +++ b/server.js @@ -40,6 +40,8 @@ const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); const { loadAppQueue } = require("./server/notifications/queues/appQueue"); const { SetLegacyWebsocketHandlers } = require("./server/web-sockets/web-socket"); const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue"); +const { loadChatterApiQueue } = require("./server/data/queues/chatterApiQueue"); +const { processChatterApiJob } = require("./server/data/chatter-api"); const CLUSTER_RETRY_BASE_DELAY = 100; const CLUSTER_RETRY_MAX_DELAY = 5000; @@ -391,6 +393,15 @@ const applySocketIO = async ({ server, app }) => { const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const queueSettings = { pubClient, logger, redisHelpers, ioRedis }; + // Load chatterApi queue with processJob function and redis helpers + const chatterApiQueue = await loadChatterApiQueue({ + pubClient, + logger, + processJob: processChatterApiJob, + getChatterToken: redisHelpers.getChatterToken, + setChatterToken: redisHelpers.setChatterToken + }); + // Assuming loadEmailQueue and loadAppQueue return Promises const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([ loadEmailQueue(queueSettings), @@ -410,6 +421,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { notificationsFcmQueue.on("error", (error) => { logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); + + chatterApiQueue.on("error", (error) => { + logger.log(`Error in chatterApiQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); + }); }; /** diff --git a/server/data/chatter-api.js b/server/data/chatter-api.js index caee129f0..1aecb7cfb 100644 --- a/server/data/chatter-api.js +++ b/server/data/chatter-api.js @@ -40,7 +40,6 @@ const logger = require("../utils/logger"); const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client"); const client = require("../graphql-client/graphql-client").client; -const { sendServerEmail } = require("../email/sendemail"); const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION"; const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5); @@ -53,74 +52,98 @@ const clientCache = new Map(); // companyId -> ChatterApiClient const tokenInFlight = new Map(); // companyId -> Promise (for in-flight deduplication) const companyRateLimiters = new Map(); // companyId -> rate limiter +/** + * Core processing function for Chatter API jobs. + * This can be called by the HTTP handler or the BullMQ worker. + * + * @param {Object} options - Processing options + * @param {string} options.start - Start date for the delivery window + * @param {string} options.end - End date for the delivery window + * @param {Array} options.bodyshopIds - Optional specific shops to process + * @param {boolean} options.skipUpload - Dry-run flag + * @param {Object} options.sessionUtils - Optional session utils for token caching + * @returns {Promise} Result with totals, allShopSummaries, and allErrors + */ +async function processChatterApiJob({ start, end, bodyshopIds, skipUpload, sessionUtils }) { + logger.log("chatter-api-start", "DEBUG", "api", null, null); + + const allErrors = []; + const allShopSummaries = []; + + // Shops that DO have chatter_company_id + const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY); + + const shopsToProcess = + bodyshopIds?.length > 0 ? bodyshops.filter((shop) => bodyshopIds.includes(shop.id)) : bodyshops; + + logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length }); + + if (shopsToProcess.length === 0) { + logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null); + return { + totals: { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }, + allShopSummaries: [], + allErrors: [] + }; + } + + await processBatchApi({ + shopsToProcess, + start, + end, + skipUpload, + allShopSummaries, + allErrors, + sessionUtils + }); + + const totals = allShopSummaries.reduce( + (acc, s) => { + acc.shops += 1; + acc.jobs += s.jobs || 0; + acc.sent += s.sent || 0; + acc.duplicates += s.duplicates || 0; + acc.failed += s.failed || 0; + return acc; + }, + { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 } + ); + + logger.log("chatter-api-end", "DEBUG", "api", null, totals); + + return { totals, allShopSummaries, allErrors }; +} + exports.default = async (req, res) => { if (process.env.NODE_ENV !== "production") return res.sendStatus(403); if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401); res.status(202).json({ success: true, - message: "Processing Chatter-API Cron request ...", + message: "Chatter API job queued for processing", timestamp: new Date().toISOString() }); try { - logger.log("chatter-api-start", "DEBUG", "api", null, null); + const { dispatchChatterApiJob } = require("./queues/chatterApiQueue"); + const { start, end, bodyshopIds, skipUpload } = req.body; - const allErrors = []; - const allShopSummaries = []; - - // Shops that DO have chatter_company_id - const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY); - - const specificShopIds = req.body.bodyshopIds; - const { start, end, skipUpload } = req.body; // keep same flag; now acts like "dry run" - - const shopsToProcess = - specificShopIds?.length > 0 ? bodyshops.filter((shop) => specificShopIds.includes(shop.id)) : bodyshops; - - logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length }); - - if (shopsToProcess.length === 0) { - logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null); - return; - } - - await processBatchApi({ - shopsToProcess, + await dispatchChatterApiJob({ start, end, - skipUpload, - allShopSummaries, - allErrors, - sessionUtils: req.sessionUtils + bodyshopIds, + skipUpload }); - - const totals = allShopSummaries.reduce( - (acc, s) => { - acc.shops += 1; - acc.jobs += s.jobs || 0; - acc.sent += s.sent || 0; - acc.duplicates += s.duplicates || 0; - acc.failed += s.failed || 0; - return acc; - }, - { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 } - ); - - await sendServerEmail({ - subject: `Chatter API Report ${moment().format("MM-DD-YY")}`, - text: - `Totals:\n${JSON.stringify(totals, null, 2)}\n\n` + - `Shop summaries:\n${JSON.stringify(allShopSummaries, null, 2)}\n\n` + - `Errors:\n${JSON.stringify(allErrors, null, 2)}\n` - }); - - logger.log("chatter-api-end", "DEBUG", "api", null, totals); } catch (error) { - logger.log("chatter-api-error", "ERROR", "api", null, { error: error.message, stack: error.stack }); + logger.log("chatter-api-queue-dispatch-error", "ERROR", "api", null, { + error: error.message, + stack: error.stack + }); } }; +exports.processChatterApiJob = processChatterApiJob; + async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) { for (const bodyshop of shopsToProcess) { const summary = { diff --git a/server/data/queues/chatterApiQueue.js b/server/data/queues/chatterApiQueue.js new file mode 100644 index 000000000..dc0f63f47 --- /dev/null +++ b/server/data/queues/chatterApiQueue.js @@ -0,0 +1,178 @@ +const { Queue, Worker } = require("bullmq"); +const { registerCleanupTask } = require("../../utils/cleanupManager"); +const getBullMQPrefix = require("../../utils/getBullMQPrefix"); +const devDebugLogger = require("../../utils/devDebugLogger"); +const moment = require("moment-timezone"); +const { sendServerEmail } = require("../../email/sendemail"); + +let chatterApiQueue; +let chatterApiWorker; + +/** + * Initializes the Chatter API queue and worker. + * + * @param {Object} options - Configuration options for queue initialization. + * @param {Object} options.pubClient - Redis client instance for queue communication. + * @param {Object} options.logger - Logger instance for logging events and debugging. + * @param {Function} options.processJob - Function to process the Chatter API job. + * @param {Function} options.getChatterToken - Function to get Chatter token from Redis. + * @param {Function} options.setChatterToken - Function to set Chatter token in Redis. + * @returns {Queue} The initialized `chatterApiQueue` instance. + */ +const loadChatterApiQueue = async ({ pubClient, logger, processJob, getChatterToken, setChatterToken }) => { + if (!chatterApiQueue) { + const prefix = getBullMQPrefix(); + + devDebugLogger(`Initializing Chatter API Queue with prefix: ${prefix}`); + + chatterApiQueue = new Queue("chatterApi", { + prefix, + connection: pubClient, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: false, + attempts: 3, + backoff: { + type: "exponential", + delay: 60000 // 1 minute base delay + } + } + }); + + chatterApiWorker = new Worker( + "chatterApi", + async (job) => { + const { start, end, bodyshopIds, skipUpload } = job.data; + + logger.log("chatter-api-queue-job-start", "INFO", "api", null, { + jobId: job.id, + start, + end, + bodyshopIds, + skipUpload + }); + + try { + // Provide sessionUtils-like object with token caching functions + const sessionUtils = { + getChatterToken, + setChatterToken + }; + + const result = await processJob({ + start, + end, + bodyshopIds, + skipUpload, + sessionUtils + }); + + logger.log("chatter-api-queue-job-complete", "INFO", "api", null, { + jobId: job.id, + totals: result.totals + }); + + // Send email summary + await sendServerEmail({ + subject: `Chatter API Report ${moment().format("MM-DD-YY")}`, + text: + `Totals:\n${JSON.stringify(result.totals, null, 2)}\n\n` + + `Shop summaries:\n${JSON.stringify(result.allShopSummaries, null, 2)}\n\n` + + `Errors:\n${JSON.stringify(result.allErrors, null, 2)}\n` + }); + + return result; + } catch (error) { + logger.log("chatter-api-queue-job-error", "ERROR", "api", null, { + jobId: job.id, + error: error.message, + stack: error.stack + }); + + // Send error email + await sendServerEmail({ + subject: `Chatter API Error ${moment().format("MM-DD-YY")}`, + text: `Job failed:\n${error.message}\n\n${error.stack}` + }); + + throw error; + } + }, + { + prefix, + connection: pubClient, + concurrency: 1, // Process one job at a time + lockDuration: 14400000 // 4 hours - allow long-running jobs + } + ); + + // Event handlers + chatterApiWorker.on("completed", (job) => { + devDebugLogger(`Chatter API job ${job.id} completed`); + }); + + chatterApiWorker.on("failed", (job, err) => { + logger.log("chatter-api-queue-job-failed", "ERROR", "api", null, { + jobId: job?.id, + message: err?.message, + stack: err?.stack + }); + }); + + chatterApiWorker.on("progress", (job, progress) => { + devDebugLogger(`Chatter API job ${job.id} progress: ${progress}%`); + }); + + // Register cleanup task + const shutdown = async () => { + devDebugLogger("Closing Chatter API queue worker..."); + await chatterApiWorker.close(); + devDebugLogger("Chatter API queue worker closed"); + }; + registerCleanupTask(shutdown); + } + + return chatterApiQueue; +}; + +/** + * Retrieves the initialized `chatterApiQueue` instance. + * + * @returns {Queue} The `chatterApiQueue` instance. + * @throws {Error} If `chatterApiQueue` is not initialized. + */ +const getQueue = () => { + if (!chatterApiQueue) { + throw new Error("Chatter API queue not initialized. Ensure loadChatterApiQueue is called during bootstrap."); + } + return chatterApiQueue; +}; + +/** + * Dispatches a Chatter API job to the queue. + * + * @param {Object} options - Options for the job. + * @param {string} options.start - Start date for the delivery window. + * @param {string} options.end - End date for the delivery window. + * @param {Array} options.bodyshopIds - Optional specific shops to process. + * @param {boolean} options.skipUpload - Dry-run flag. + * @returns {Promise} Resolves when the job is added to the queue. + */ +const dispatchChatterApiJob = async ({ start, end, bodyshopIds, skipUpload }) => { + const queue = getQueue(); + + const jobData = { + start: start || moment().subtract(1, "days").startOf("day").toISOString(), + end: end || moment().endOf("day").toISOString(), + bodyshopIds: bodyshopIds || [], + skipUpload: skipUpload || false + }; + + await queue.add("process-chatter-api", jobData, { + jobId: `chatter-api-${moment().format("YYYY-MM-DD-HHmmss")}` + }); + + devDebugLogger(`Added Chatter API job to queue: ${JSON.stringify(jobData)}`); +}; + +module.exports = { loadChatterApiQueue, getQueue, dispatchChatterApiJob };