const { Queue, Worker } = require("bullmq"); const { registerCleanupTask } = require("../../utils/cleanupManager"); const getBullMQPrefix = require("../../utils/getBullMQPrefix"); const devDebugLogger = require("../../utils/devDebugLogger"); const moment = require("moment-timezone"); const { sendServerEmail } = require("../../email/sendemail"); let chatterApiQueue; let chatterApiWorker; /** * Initializes the Chatter API queue and worker. * * @param {Object} options - Configuration options for queue initialization. * @param {Object} options.pubClient - Redis client instance for queue communication. * @param {Object} options.logger - Logger instance for logging events and debugging. * @param {Function} options.processJob - Function to process the Chatter API job. * @param {Function} options.getChatterToken - Function to get Chatter token from Redis. * @param {Function} options.setChatterToken - Function to set Chatter token in Redis. * @returns {Queue} The initialized `chatterApiQueue` instance. */ const loadChatterApiQueue = async ({ pubClient, logger, processJob, getChatterToken, setChatterToken }) => { if (!chatterApiQueue) { const prefix = getBullMQPrefix(); devDebugLogger(`Initializing Chatter API Queue with prefix: ${prefix}`); chatterApiQueue = new Queue("chatterApi", { prefix, connection: pubClient, defaultJobOptions: { removeOnComplete: true, removeOnFail: false, attempts: 3, backoff: { type: "exponential", delay: 60000 // 1 minute base delay } } }); chatterApiWorker = new Worker( "chatterApi", async (job) => { const { start, end, bodyshopIds, skipUpload } = job.data; logger.log("chatter-api-queue-job-start", "INFO", "api", null, { jobId: job.id, start, end, bodyshopIds, skipUpload }); try { // Provide sessionUtils-like object with token caching functions const sessionUtils = { getChatterToken, setChatterToken }; const result = await processJob({ start, end, bodyshopIds, skipUpload, sessionUtils }); logger.log("chatter-api-queue-job-complete", "INFO", "api", null, { jobId: job.id, totals: result.totals }); // Send email summary await sendServerEmail({ subject: `Chatter API Report ${moment().format("MM-DD-YY")}`, text: `Totals:\n${JSON.stringify(result.totals, null, 2)}\n\n` + `Shop summaries:\n${JSON.stringify(result.allShopSummaries, null, 2)}\n\n` + `Errors:\n${JSON.stringify(result.allErrors, null, 2)}\n` }); return result; } catch (error) { logger.log("chatter-api-queue-job-error", "ERROR", "api", null, { jobId: job.id, error: error.message, stack: error.stack }); // Send error email await sendServerEmail({ subject: `Chatter API Error ${moment().format("MM-DD-YY")}`, text: `Job failed:\n${error.message}\n\n${error.stack}` }); throw error; } }, { prefix, connection: pubClient, concurrency: 1, // Process one job at a time lockDuration: 14400000 // 4 hours - allow long-running jobs } ); // Event handlers chatterApiWorker.on("completed", (job) => { devDebugLogger(`Chatter API job ${job.id} completed`); }); chatterApiWorker.on("failed", (job, err) => { logger.log("chatter-api-queue-job-failed", "ERROR", "api", null, { jobId: job?.id, message: err?.message, stack: err?.stack }); }); chatterApiWorker.on("progress", (job, progress) => { devDebugLogger(`Chatter API job ${job.id} progress: ${progress}%`); }); // Register cleanup task const shutdown = async () => { devDebugLogger("Closing Chatter API queue worker..."); await chatterApiWorker.close(); devDebugLogger("Chatter API queue worker closed"); }; registerCleanupTask(shutdown); } return chatterApiQueue; }; /** * Retrieves the initialized `chatterApiQueue` instance. * * @returns {Queue} The `chatterApiQueue` instance. * @throws {Error} If `chatterApiQueue` is not initialized. */ const getQueue = () => { if (!chatterApiQueue) { throw new Error("Chatter API queue not initialized. Ensure loadChatterApiQueue is called during bootstrap."); } return chatterApiQueue; }; /** * Dispatches a Chatter API job to the queue. * * @param {Object} options - Options for the job. * @param {string} options.start - Start date for the delivery window. * @param {string} options.end - End date for the delivery window. * @param {Array} options.bodyshopIds - Optional specific shops to process. * @param {boolean} options.skipUpload - Dry-run flag. * @returns {Promise} Resolves when the job is added to the queue. */ const dispatchChatterApiJob = async ({ start, end, bodyshopIds, skipUpload }) => { const queue = getQueue(); const jobData = { start: start || moment().subtract(1, "days").startOf("day").toISOString(), end: end || moment().endOf("day").toISOString(), bodyshopIds: bodyshopIds || [], skipUpload: skipUpload || false }; await queue.add("process-chatter-api", jobData, { jobId: `chatter-api-${moment().format("YYYY-MM-DD-HHmmss")}` }); devDebugLogger(`Added Chatter API job to queue: ${JSON.stringify(jobData)}`); }; module.exports = { loadChatterApiQueue, getQueue, dispatchChatterApiJob };