import { Job, Queue, QueueEvents, Worker } from "bullmq"; import dotenv from "dotenv"; import { fileTypeFromFile } from "file-type"; import { FileTypeResult } from "file-type/core"; import fs from "fs-extra"; import gm from "gm"; import path from "path"; import { logger } from "../server.js"; import { generateUniqueHeicFilename } from "./generateUniqueFilename.js"; import { FolderPaths } from "./serverInit.js"; dotenv.config({ path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) }); const QUEUE_NAME = "heicQueue"; const connectionOpts = { host: "localhost", port: 6379, enableReadyCheck: true, reconnectOnError: (err: Error) => err.message.includes("READONLY") }; const heicQueue = new Queue(QUEUE_NAME, { connection: connectionOpts, defaultJobOptions: { removeOnComplete: true, removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 1000 } } }); // Re-added QueueEvents for waiting on job completion const heicQueueEvents = new QueueEvents(QUEUE_NAME, { connection: connectionOpts }); const CLEANUP_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes setInterval(cleanupQueue, CLEANUP_INTERVAL_MS); async function cleanupQueue() { try { const ONE_HOUR = 60 * 60 * 1000; const SIX_HOURS = 6 * ONE_HOUR; await heicQueue.clean(ONE_HOUR, 500, "completed"); await heicQueue.clean(SIX_HOURS, 500, "failed"); const counts = await heicQueue.getJobCounts(); logger.debug(`HEIC Queue status: ${JSON.stringify(counts)}`); } catch (error) { logger.error("HEIC Queue cleanup error:", error); } } /** * Filter files to include only valid HEIC images. */ async function filterHeicFiles(files: Express.Multer.File[]) { const valid: Express.Multer.File[] = []; for (const file of files) { const type: FileTypeResult | undefined = await fileTypeFromFile(file.path); if (type?.mime === "image/heic") valid.push(file); } return valid; } /** * Handle original file based on environment variable. */ async function handleOriginalFile(fileInfo: { path: string; destination: string; originalFilename: string }) { try { if (process.env.KEEP_CONVERTED_ORIGINALS) { const destDir = path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir); await fs.ensureDir(destDir); await fs.move(fileInfo.path, path.join(destDir, fileInfo.originalFilename), { overwrite: true }); } else { await fs.unlink(fileInfo.path); } } catch (error) { // Don't throw error for file handling issues - log as warning and continue logger.warning("Error handling original HEIC file (continuing with conversion):", { file: fileInfo.originalFilename, path: fileInfo.path, error: (error as Error).message }); } } /** * Convert HEIC to JPEG using GraphicsMagick stream. */ async function convertToJpeg(inputPath: string, outputPath: string): Promise { return new Promise((resolve, reject) => { try { const readStream = fs.createReadStream(inputPath); const writeStream = fs.createWriteStream(outputPath); // Add error handling for stream creation readStream.on("error", (err) => { logger.warning(`Error reading HEIC file ${inputPath}:`, err); reject(new Error(`Failed to read HEIC file: ${err.message}`)); }); writeStream.on("error", (err) => { logger.warning(`Error writing converted file ${outputPath}:`, err); reject(new Error(`Failed to write converted file: ${err.message}`)); }); gm(readStream) .setFormat("jpg") .stream() .on("error", (err) => { logger.warning(`GraphicsMagick conversion error for ${inputPath}:`, err); reject(new Error(`GraphicsMagick conversion failed: ${err.message}`)); }) .pipe(writeStream) .on("finish", () => resolve(outputPath)) .on("error", (err) => { logger.warning(`Stream pipe error for ${inputPath}:`, err); reject(new Error(`Stream processing failed: ${err.message}`)); }); } catch (error) { logger.warning(`Unexpected error in convertToJpeg for ${inputPath}:`, error); reject(new Error(`Conversion setup failed: ${(error as Error).message}`)); } }); } /** * Add HEIC files to the conversion queue and wait for job completion. */ export async function convertHeicFiles(files: Express.Multer.File[]) { const heicFiles = await filterHeicFiles(files); if (heicFiles.length === 0) { logger.debug("No HEIC files found to convert."); return; } const jobs = heicFiles.map((file) => ({ name: file.filename, data: { convertedFileName: generateUniqueHeicFilename(file), fileInfo: { path: file.path, destination: file.destination, originalFilename: file.filename } } })); // Add jobs and wait for completion of each before proceeding const successfulConversions: Array<{ originalFilename: string; convertedFileName: string }> = []; const failedConversions: Array<{ originalFilename: string; error: string }> = []; for (const jobData of jobs) { try { const job = await heicQueue.add(jobData.name, jobData.data); const result = await job.waitUntilFinished(heicQueueEvents); if (result && result.success) { logger.debug(`Job ${job.id} finished successfully.`); successfulConversions.push({ originalFilename: jobData.data.fileInfo.originalFilename, convertedFileName: jobData.data.convertedFileName }); } else { logger.warning(`Job ${job.id} completed but conversion failed for ${jobData.data.fileInfo.originalFilename}`); failedConversions.push({ originalFilename: jobData.data.fileInfo.originalFilename, error: result?.error || "Unknown conversion error" }); } } catch (error) { logger.warning(`Job for ${jobData.data.fileInfo.originalFilename} failed:`, error); failedConversions.push({ originalFilename: jobData.data.fileInfo.originalFilename, error: (error as Error).message }); // Continue with next job instead of stopping } } // Log summary if (failedConversions.length > 0) { logger.warning( `HEIC conversion summary: ${successfulConversions.length} successful, ${failedConversions.length} failed:`, { failed: failedConversions.map((f) => f.originalFilename) } ); } else { logger.debug(`HEIC conversion summary: All ${successfulConversions.length} files converted successfully`); } // Update original files list with new names, mimetype, and path - only for successful conversions const filenameToIndex = new Map(files.map((f, i) => [f.filename, i])); for (const conversion of successfulConversions) { const idx = filenameToIndex.get(conversion.originalFilename); if (idx !== undefined) { const oldPath = files[idx].path; files[idx].filename = conversion.convertedFileName; files[idx].mimetype = "image/jpeg"; files[idx].path = path.join(files[idx].destination, conversion.convertedFileName); logger.debug(`Updated file entry: ${conversion.originalFilename} -> ${conversion.convertedFileName}`, { oldPath, newPath: files[idx].path, newMimetype: files[idx].mimetype }); } } // Remove failed conversions from the files array to prevent further processing for (let i = files.length - 1; i >= 0; i--) { if (failedConversions.some((f) => f.originalFilename === files[i].filename)) { logger.debug(`Removing failed HEIC file from processing: ${files[i].filename}`); files.splice(i, 1); } } } // Worker processing HEIC conversion jobs const heicWorker = new Worker( QUEUE_NAME, async ( job: Job<{ fileInfo: { path: string; destination: string; originalFilename: string }; convertedFileName: string; }> ) => { const { fileInfo, convertedFileName } = job.data; const outputPath = path.join(fileInfo.destination, convertedFileName); try { logger.debug(`Converting ${fileInfo.originalFilename} from HEIC to JPEG.`); await job.updateProgress(10); await convertToJpeg(fileInfo.path, outputPath); await job.updateProgress(50); await handleOriginalFile(fileInfo); logger.debug(`Successfully converted ${fileInfo.originalFilename} to JPEG.`); await job.updateProgress(100); return { success: true, convertedFileName }; } catch (error) { const errorMessage = (error as Error).message; // Handle GraphicsMagick and other conversion errors gracefully if ( errorMessage.includes("Magick") || errorMessage.includes("HEIC") || errorMessage.includes("corrupt") || errorMessage.includes("invalid") ) { logger.warning( `[heicWorker] HEIC conversion error for ${fileInfo.originalFilename}, moving to damaged folder:`, errorMessage ); // Clean up both the original HEIC file and any partially created JPEG try { // Move original HEIC file to damaged subfolder instead of deleting const damagedDir = path.join(fileInfo.destination, FolderPaths.DamagedSubDir); await fs.ensureDir(damagedDir); const damagedFilePath = path.join(damagedDir, fileInfo.originalFilename); if (await fs.pathExists(fileInfo.path)) { await fs.move(fileInfo.path, damagedFilePath, { overwrite: true }); logger.debug(`Moved damaged HEIC file to: ${damagedFilePath}`); } } catch (cleanupError) { logger.warning(`Failed to move damaged file for ${fileInfo.originalFilename}:`, cleanupError); } return { success: false, error: errorMessage, originalFilename: fileInfo.originalFilename }; } logger.error(`[heicWorker] Unexpected error converting ${fileInfo.originalFilename}:`, error); // For other errors, also clean up any partial files try { // Move original HEIC file to damaged subfolder instead of deleting const damagedDir = path.join(fileInfo.destination, FolderPaths.DamagedSubDir); await fs.ensureDir(damagedDir); const damagedFilePath = path.join(damagedDir, fileInfo.originalFilename); if (await fs.pathExists(fileInfo.path)) { await fs.move(fileInfo.path, damagedFilePath, { overwrite: true }); logger.debug(`Moved damaged HEIC file after unexpected error to: ${damagedFilePath}`); } } catch (cleanupError) { logger.warning( `Failed to move damaged file after unexpected error for ${fileInfo.originalFilename}:`, cleanupError ); } return { success: false, error: errorMessage, originalFilename: fileInfo.originalFilename }; } }, { connection: connectionOpts, concurrency: 1 } ); // Event listeners for queue and worker heicQueue.on("waiting", (job) => { logger.debug(`[heicQueue] Job waiting in queue: ${job.data.convertedFileName}`); }); heicQueue.on("error", (error) => { logger.error(`[heicQueue] Queue error:`, error); }); heicWorker.on("ready", () => { logger.debug("[heicWorker] Worker ready"); }); heicWorker.on("active", (job, prev) => { logger.debug(`[heicWorker] Job ${job.id} active (previous: ${prev})`); }); heicWorker.on("completed", async (job) => { logger.debug(`[heicWorker] Job ${job.id} completed`); await job.remove(); logger.debug(`Job ${job.id} removed from Redis`); }); heicWorker.on("failed", (jobId, reason) => { logger.error(`[heicWorker] Job ${jobId} failed: ${reason}`); }); heicWorker.on("error", (error) => { logger.error(`[heicWorker] Worker error:`, error); }); heicWorker.on("stalled", (job) => { logger.error(`[heicWorker] Worker stalled: ${job}`); }); heicWorker.on("ioredis:close", () => { logger.error("[heicWorker] Redis connection closed"); });