import { Job, Queue, Worker } from "bullmq"; import dotenv from "dotenv"; import { fileTypeFromFile } from "file-type"; import { FileTypeResult } from "file-type/core"; import fs from "fs-extra"; import gm from "gm"; import path from "path"; import { fileURLToPath } from "url"; import { logger } from "../server.js"; import { generateUniqueHeicFilename } from "./generateUniqueFilename.js"; import { FolderPaths } from "./serverInit.js"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const HeicQueue = new Queue("HEIC Queue", { connection: { host: "localhost", port: 6379, maxRetriesPerRequest: 3, enableReadyCheck: true, reconnectOnError: function (err) { const targetError = "READONLY"; return err.message.includes(targetError); } }, defaultJobOptions: { removeOnComplete: true, removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 1000 } } }); const cleanupINTERVAL = 1000 * 60 * 10; setInterval(cleanupQueue, cleanupINTERVAL); dotenv.config({ path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); const imageMagick = gm.subClass({ imageMagick: true }); async function cleanupQueue() { const ONE_HOUR = 1000 * 60 * 60; const SIX_HOURS = ONE_HOUR * 6; try { // Clean completed jobs older than 1 hour await HeicQueue.clean(ONE_HOUR, 500, "completed"); // Clean failed jobs older than 24 hours await HeicQueue.clean(SIX_HOURS, 500, "failed"); // Get queue health const jobCounts = await HeicQueue.getJobCounts(); logger.log("debug", `Queue status: ${JSON.stringify(jobCounts)}`); } catch (error) { logger.log("error", `Queue cleanup error: ${error}`); } } export async function ConvertHeicFiles(files: Express.Multer.File[]) { const validFiles = await filterValidHeicFiles(files); const jobs = validFiles.map((file) => ({ name: file.filename, data: { convertedFileName: generateUniqueHeicFilename(file), fileInfo: { path: file.path, destination: file.destination, originalFilename: file.filename } } })); await HeicQueue.addBulk(jobs); const fileMap = new Map(files.map((file, index) => [file.filename, index])); jobs.forEach((job) => { const fileIndex = fileMap.get(job.data.fileInfo.originalFilename); if (fileIndex !== undefined) { files[fileIndex].filename = job.data.convertedFileName; files[fileIndex].mimetype = "image/jpeg"; } }); } async function filterValidHeicFiles(files: Express.Multer.File[]) { const validFiles = []; for (const file of files) { const type: FileTypeResult | undefined = await fileTypeFromFile(file.path); if (type?.mime === "image/heic") { validFiles.push(file); } } return validFiles; } async function handleOriginalFile(fileInfo: { path: string; destination: string; originalFilename: string }) { try { if (process.env.KEEP_CONVERTED_ORIGINALS) { await fs.ensureDir(path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir)); await fs.move( fileInfo.path, `${path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir)}/${fileInfo.originalFilename}` ); } else { await fs.unlink(fileInfo.path); } } catch (error) { logger.log("error", `Error handling original file: ${error}`); throw error; } } async function ConvertToJpeg(file: string, newPath: string) { // const fileOnDisk: Buffer = await fs.readFile(file); // return new Promise((resolve, reject) => { // imageMagick(fileOnDisk) // .setFormat("jpg") // .write(newPath, (error) => { // if (error) reject(error.message); // resolve(newPath); // }); // }); return new Promise((resolve, reject) => { const readStream = fs.createReadStream(file); const writeStream = fs.createWriteStream(newPath); imageMagick(readStream) .setFormat("jpg") .stream() .pipe(writeStream) .on("finish", () => resolve(newPath)) .on("error", (error) => reject(error.message)); }); } const HeicWorker = new Worker( "HEIC Queue", async (job: Job) => { const { fileInfo, convertedFileName } = job.data; try { logger.log("debug", `Attempting to Convert ${fileInfo.originalFilename} image to JPEG from HEIC.`); await job.updateProgress(10); await ConvertToJpeg(fileInfo.path, `${fileInfo.destination}/${convertedFileName}`); await job.updateProgress(50); await handleOriginalFile(fileInfo); logger.log("debug", `Converted ${fileInfo.originalFilename} image to JPEG from HEIC.`); await job.updateProgress(100); return true; } catch (error) { logger.log( "error", `QUEUE ERROR: Error converting ${fileInfo.originalFilename} image to JPEG from HEIC. ${JSON.stringify(error)}` ); throw error; } }, { connection: { host: "localhost", port: 6379, maxRetriesPerRequest: 3, enableReadyCheck: true, reconnectOnError: function (err) { const targetError = "READONLY"; return err.message.includes(targetError); } }, concurrency: 1 } ); HeicQueue.on("waiting", (job) => { logger.log("debug", `[BULLMQ] Job is waiting in queue! ${job.data.convertedFileName}`); }); HeicQueue.on("error", (error) => { logger.log("error", `[BULLMQ] Queue Error! ${error}`); }); HeicWorker.on("ready", () => { logger.log("debug", `[BULLMQ] Worker Ready`); }); HeicWorker.on("active", (job, prev) => { logger.log("debug", `[BULLMQ] Job ${job.id} is now active; previous status was ${prev}`); }); HeicWorker.on("completed", async (job, returnvalue) => { logger.log("debug", `[BULLMQ] ${job.id} has completed and returned ${returnvalue}`); await job.remove(); logger.log("debug", `Job ${job.id} removed from Redis`); }); HeicWorker.on("failed", (jobId, failedReason) => { logger.log("error", `[BULLMQ] ${jobId} has failed with reason ${failedReason}`); }); HeicWorker.on("error", (error) => { logger.log("error", `[BULLMQ] There was a queue error! ${error}`); }); HeicWorker.on("stalled", (error) => { logger.log("error", `[BULLMQ] There was a worker stall! ${error}`); }); HeicWorker.on("ioredis:close", () => { logger.log("error", `[BULLMQ] Redis connection closed!`); });