import { Job, Queue, QueueEvents, Worker } from "bullmq"; import dotenv from "dotenv"; import { fileTypeFromFile } from "file-type"; import { FileTypeResult } from "file-type/core"; import fs from "fs-extra"; import gm from "gm"; import path from "path"; import { fileURLToPath } from "url"; import { logger } from "../server.js"; import { generateUniqueHeicFilename } from "./generateUniqueFilename.js"; import { FolderPaths } from "./serverInit.js"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const HeicQueue = new Queue("HEIC Queue", { connection: { host: "localhost", port: 6379 } }); dotenv.config({ path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); const imageMagick = gm.subClass({ imageMagick: true }); export async function ConvertHeicFiles(files: Express.Multer.File[]) { const validFiles = await filterValidHeicFiles(files); const jobs = await HeicQueue.addBulk( validFiles.map((file) => ({ name: file.filename, data: { convertedFileName: generateUniqueHeicFilename(file), file } })) ); // await Promise.all( // validFiles.map(async (file) => { // const convertedFileName = generateUniqueHeicFilename(file); // await HeicQueue.add(convertedFileName, { convertedFileName, file },{removeOnComplete: true,}); // // try { // // await ConvertToJpeg(file.path, `${file.destination}/${convertedFileName}`); // // logger.log("debug", `Converted ${file.filename} image to JPEG from HEIC.`); // // await handleOriginalFile(file, convertedFileName); // // file.filename = convertedFileName; // // file.mimetype = "image/jpeg"; // // file.path = `${file.destination}/${convertedFileName}`; // // } catch (error) { // // logger.log("error", `Error converting ${file.filename} image to JPEG from HEIC. ${JSON.stringify(error)}`); // // } // }) // ); } async function filterValidHeicFiles(files: Express.Multer.File[]) { const validFiles = []; for (const file of files) { const type: FileTypeResult | undefined = await fileTypeFromFile(file.path); if (type?.mime === "image/heic") { validFiles.push(file); } } return validFiles; } async function handleOriginalFile(file: Express.Multer.File, convertedFileName: string) { if (process.env.KEEP_CONVERTED_ORIGINALS) { await fs.ensureDir(path.join(file.destination, FolderPaths.ConvertedOriginalSubDir)); await fs.move(file.path, `${path.join(file.destination, FolderPaths.ConvertedOriginalSubDir)}/${file.filename}`); } else { await fs.unlink(file.path); } } async function ConvertToJpeg(file: string, newPath: string) { const fileOnDisk: Buffer = await fs.readFile(file); return new Promise((resolve, reject) => { imageMagick(fileOnDisk) .setFormat("jpg") .write(newPath, (error) => { if (error) reject(error.message); resolve(newPath); }); }); } //Previos implementation using sandboxing. Cannot set up because the imports try to launch the server again. // const processorUrl = pathToFileURL(__dirname + "/heicQueueProcessor.ts"); // const HeicWorker = new Worker("HEIC Queue", processorUrl, { // connection: { host: "localhost", port: 6379 } // }); const HeicWorker = new Worker( "HEIC Queue", async (job: Job) => { const { file, convertedFileName } = job.data; try { logger.log("debug", `Attempting to Convert ${file.filename} image to JPEG from HEIC.`); await ConvertToJpeg(file.path, `${file.destination}/${convertedFileName}`); logger.log("debug", `Converted ${file.filename} image to JPEG from HEIC.`); await handleOriginalFile(file, convertedFileName); file.filename = convertedFileName; file.mimetype = "image/jpeg"; file.path = `${file.destination}/${convertedFileName}`; return true; } catch (error) { logger.log( "error", `QUEUE ERROR: Error converting ${file.filename} image to JPEG from HEIC. ${JSON.stringify(error)}` ); return false; } }, { connection: { host: "localhost", port: 6379 } } ); HeicQueue.on("waiting", (job) => { logger.log("debug", `[BULLMQ] Job is waiting in queue! ${job.data.convertedFileName}`); }); HeicQueue.on("error", (error) => { logger.log("error", `[BULLMQ] Queue Error! ${error}`); }); HeicWorker.on("ready", () => { logger.log("debug", `[BULLMQ] Worker Ready`); }); HeicWorker.on("active", (job, prev) => { logger.log("debug", `[BULLMQ] Job ${job.id} is now active; previous status was ${prev}`); }); HeicWorker.on("completed", (jobId, returnvalue) => { logger.log("debug", `[BULLMQ] ${jobId.id} has completed and returned ${returnvalue}`); }); HeicWorker.on("failed", (jobId, failedReason) => { logger.log("error", `[BULLMQ] ${jobId} has failed with reason ${failedReason}`); }); HeicWorker.on("error", (error) => { logger.log("error", `[BULLMQ] There was a queue error! ${error}`); }); HeicWorker.on("stalled", (error) => { logger.log("error", `[BULLMQ] There was a worker stall! ${error}`); }); HeicWorker.on("ioredis:close", () => { logger.log("error", `[BULLMQ] Redis connection closed!`); }); // const queueEvents = new QueueEvents( "HEIC Queue"); // queueEvents.on('completed', ( jobId, returnvalue ) => { // // Called every time a job is completed by any worker. // }); // queueEvents.on('failed', (jobId, failedReason ) => { // // Called whenever a job is moved to failed by any worker. // }); // queueEvents.on('progress', (jobId, data) => { // // jobId received a progress event // });