import { Job, Queue, QueueEvents, Worker } from "bullmq"; import dotenv from "dotenv"; import { Request, Response } from "express"; import fs from "fs-extra"; import multer from "multer"; import path, { resolve } from "path"; import { logger } from "../server.js"; import GenerateThumbnail from "../util/generateThumbnail.js"; import { generateUniqueBillFilename } from "../util/generateUniqueFilename.js"; import { convertHeicFiles } from "../util/heicConverter.js"; import { PathToRoBillsFolder, PathToVendorBillsFile } from "../util/pathGenerators.js"; import { BillsListMedia } from "./billsListMedia.js"; dotenv.config({ path: resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); const BILLS_UPLOAD_QUEUE_NAME = "billsUploadProcessingQueue"; const connectionOpts = { host: "localhost", port: 6379, maxRetriesPerRequest: 3, enableReadyCheck: true, reconnectOnError: (err: Error) => err.message.includes("READONLY") }; const billsUploadProcessingQueue = new Queue(BILLS_UPLOAD_QUEUE_NAME, { connection: connectionOpts, defaultJobOptions: { removeOnComplete: true, removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 1000 } } }); const billsUploadQueueEvents = new QueueEvents(BILLS_UPLOAD_QUEUE_NAME, { connection: connectionOpts }); // Bills upload processing worker const billsUploadWorker = new Worker( BILLS_UPLOAD_QUEUE_NAME, async ( job: Job<{ files: Express.Multer.File[]; vendorid: string; invoice_number: string; skipThumbnails: boolean; duplicateToVendor: boolean; }> ) => { const { files, vendorid, invoice_number, skipThumbnails, duplicateToVendor } = job.data; try { logger.debug(`Processing bills upload with ${files.length} files`); await job.updateProgress(10); // Convert HEIC files if any await convertHeicFiles(files); await job.updateProgress(30); if (!skipThumbnails) { // Filter out HEIC files since they've been converted to JPEG already const filesToThumbnail = files.filter((file) => { const isHeic = file.mimetype === "image/heic" || file.mimetype === "image/heif" || /\.heic$/i.test(file.filename) || /\.heif$/i.test(file.filename); return !isHeic; }); logger.debug( `Generating thumbnails for ${filesToThumbnail.length} bill files (excluding HEIC)`, filesToThumbnail.map((f) => f.filename) ); // Generate thumbnails const thumbnailPromises = filesToThumbnail.map((file) => GenerateThumbnail(file.path)); await Promise.all(thumbnailPromises); await job.updateProgress(60); } // Duplicate to vendor folder if enabled if (duplicateToVendor && vendorid && invoice_number) { await Promise.all( files.map(async (file) => { const target = path.join(PathToVendorBillsFile(vendorid), file.filename); await fs.ensureDir(path.dirname(target)); await fs.copyFile(file.path, target); }) ); await job.updateProgress(90); } logger.debug(`Bills upload processing completed`); await job.updateProgress(100); return { success: true, filesProcessed: files.length }; } catch (error) { logger.error(`Error processing bills upload:`, error); throw error; } }, { connection: connectionOpts, concurrency: 2 // Allow 2 bills upload processing jobs to run concurrently } ); // Event listeners for bills upload queue and worker billsUploadProcessingQueue.on("waiting", (job) => { logger.debug(`[BillsUploadQueue] Job waiting: ${job.data.invoice_number}`); }); billsUploadProcessingQueue.on("error", (error) => { logger.error(`[BillsUploadQueue] Queue error:`, error); }); billsUploadWorker.on("ready", () => { logger.debug("[BillsUploadWorker] Worker ready"); }); billsUploadWorker.on("active", (job, prev) => { logger.debug(`[BillsUploadWorker] Job ${job.id} active (previous: ${prev})`); }); billsUploadWorker.on("completed", async (job) => { logger.debug(`[BillsUploadWorker] Job ${job.id} completed`); await job.remove(); logger.debug(`[BillsUploadWorker] Job ${job.id} removed from Redis`); }); billsUploadWorker.on("failed", (jobId, reason) => { logger.error(`[BillsUploadWorker] Job ${jobId} failed: ${reason}`); }); billsUploadWorker.on("error", (error) => { logger.error(`[BillsUploadWorker] Worker error:`, error); }); billsUploadWorker.on("stalled", (job) => { logger.error(`[BillsUploadWorker] Worker stalled: ${job}`); }); billsUploadWorker.on("ioredis:close", () => { logger.error("[BillsUploadWorker] Redis connection closed"); }); export const BillsMediaUploadMulter = multer({ storage: multer.diskStorage({ destination(req, file, cb) { const jobid: string = (req.body.jobid || "").trim(); if (!jobid) { cb(new Error("Job ID not specified."), ""); return; } const destinationFolder = PathToRoBillsFolder(jobid); fs.ensureDir(destinationFolder) .then(() => cb(null, destinationFolder)) .catch((err) => cb(err as Error, "")); }, filename(req, file, cb) { logger.info("Uploading file:", { file: path.basename(file.originalname) }); const invoice_number: string = (req.body.invoice_number || "").trim(); if (!invoice_number) { cb(new Error("Invoice number not specified."), ""); return; } cb(null, generateUniqueBillFilename(file, invoice_number)); } }) }); export async function BillsUploadMedia(req: Request, res: Response) { try { const files = req.files as Express.Multer.File[] | undefined; if (!files || files.length === 0) { logger.warning("Upload contained no files."); res.status(400).json({ status: false, message: "No file uploaded" }); return; } const vendorid: string = (req.body.vendorid || "").trim(); const invoice_number: string = (req.body.invoice_number || "").trim(); const skipThumbnails = req.body.skip_thumbnail === "true"; const duplicateToVendor = process.env.DUPLICATE_BILL_TO_VENDOR === "true"; // If skipping thumbnails, queue job without waiting and return immediately if (skipThumbnails) { await billsUploadProcessingQueue.add("processBillsUpload", { files, vendorid, invoice_number, skipThumbnails: true, duplicateToVendor }); res.sendStatus(200); return; } // Add bills upload processing job to queue and wait for completion logger.debug(`Adding bills upload processing job for ${files.length} files`); const job = await billsUploadProcessingQueue.add("processBillsUpload", { files, vendorid, invoice_number, skipThumbnails: false, duplicateToVendor }); // Wait for the job to complete await job.waitUntilFinished(billsUploadQueueEvents); logger.debug(`Bills upload processing job completed`); // Delegate to BillsListMedia to respond with updated media list await BillsListMedia(req, res); } catch (error) { logger.error("Error while uploading Bill Media", { files: req.files, error: (error as Error).message }); res.status(500).json({ error: (error as Error).message }); } }