This commit is contained in:
Allan Carr
2025-07-13 00:17:08 -07:00
parent 231130267f
commit 7f782d5a64
19 changed files with 2564 additions and 1416 deletions

View File

@@ -1,4 +1,4 @@
import { Job, Queue, Worker } from "bullmq";
import { Job, Queue, QueueEvents, Worker } from "bullmq";
import dotenv from "dotenv";
import { fileTypeFromFile } from "file-type";
import { FileTypeResult } from "file-type/core";
@@ -10,60 +10,112 @@ import { logger } from "../server.js";
import { generateUniqueHeicFilename } from "./generateUniqueFilename.js";
import { FolderPaths } from "./serverInit.js";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const HeicQueue = new Queue("HEIC Queue", {
connection: {
host: "localhost",
port: 6379,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
reconnectOnError: function (err) {
const targetError = "READONLY";
return err.message.includes(targetError);
}
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: "exponential",
delay: 1000
}
}
});
const cleanupINTERVAL = 1000 * 60 * 10;
setInterval(cleanupQueue, cleanupINTERVAL);
dotenv.config({
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
});
const imageMagick = gm.subClass({ imageMagick: true });
const QUEUE_NAME = "heicQueue";
const connectionOpts = {
host: "localhost",
port: 6379,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
reconnectOnError: (err: Error) => err.message.includes("READONLY")
};
const heicQueue = new Queue(QUEUE_NAME, {
connection: connectionOpts,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: { type: "exponential", delay: 1000 }
}
});
// Re-added QueueEvents for waiting on job completion
const heicQueueEvents = new QueueEvents(QUEUE_NAME, {
connection: connectionOpts
});
const CLEANUP_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes
setInterval(cleanupQueue, CLEANUP_INTERVAL_MS);
async function cleanupQueue() {
const ONE_HOUR = 1000 * 60 * 60;
const SIX_HOURS = ONE_HOUR * 6;
try {
// Clean completed jobs older than 1 hour
await HeicQueue.clean(ONE_HOUR, 500, "completed");
const ONE_HOUR = 60 * 60 * 1000;
const SIX_HOURS = 6 * ONE_HOUR;
// Clean failed jobs older than 24 hours
await HeicQueue.clean(SIX_HOURS, 500, "failed");
await heicQueue.clean(ONE_HOUR, 500, "completed");
await heicQueue.clean(SIX_HOURS, 500, "failed");
// Get queue health
const jobCounts = await HeicQueue.getJobCounts();
logger.log("debug", `Queue status: ${JSON.stringify(jobCounts)}`);
const counts = await heicQueue.getJobCounts();
logger.debug(`HEIC Queue status: ${JSON.stringify(counts)}`);
} catch (error) {
logger.log("error", `Queue cleanup error: ${error}`);
logger.error("HEIC Queue cleanup error:", error);
}
}
export async function ConvertHeicFiles(files: Express.Multer.File[]) {
const validFiles = await filterValidHeicFiles(files);
/**
* Filter files to include only valid HEIC images.
*/
async function filterHeicFiles(files: Express.Multer.File[]) {
const valid: Express.Multer.File[] = [];
for (const file of files) {
const type: FileTypeResult | undefined = await fileTypeFromFile(file.path);
if (type?.mime === "image/heic") valid.push(file);
}
return valid;
}
const jobs = validFiles.map((file) => ({
/**
* Handle original file based on environment variable.
*/
async function handleOriginalFile(fileInfo: { path: string; destination: string; originalFilename: string }) {
try {
if (process.env.KEEP_CONVERTED_ORIGINALS) {
const destDir = path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir);
await fs.ensureDir(destDir);
await fs.move(fileInfo.path, path.join(destDir, fileInfo.originalFilename), { overwrite: true });
} else {
await fs.unlink(fileInfo.path);
}
} catch (error) {
logger.error("Error handling original file:", error);
throw error;
}
}
/**
* Convert HEIC to JPEG using GraphicsMagick stream.
*/
async function convertToJpeg(inputPath: string, outputPath: string): Promise<string> {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(inputPath);
const writeStream = fs.createWriteStream(outputPath);
gm(readStream)
.setFormat("jpg")
.stream()
.pipe(writeStream)
.on("finish", () => resolve(outputPath))
.on("error", reject);
});
}
/**
* Add HEIC files to the conversion queue and wait for job completion.
*/
export async function convertHeicFiles(files: Express.Multer.File[]) {
const heicFiles = await filterHeicFiles(files);
if (heicFiles.length === 0) {
logger.debug("No HEIC files found to convert.");
return;
}
const jobs = heicFiles.map((file) => ({
name: file.filename,
data: {
convertedFileName: generateUniqueHeicFilename(file),
@@ -75,134 +127,99 @@ export async function ConvertHeicFiles(files: Express.Multer.File[]) {
}
}));
await HeicQueue.addBulk(jobs);
const fileMap = new Map(files.map((file, index) => [file.filename, index]));
jobs.forEach((job) => {
const fileIndex = fileMap.get(job.data.fileInfo.originalFilename);
if (fileIndex !== undefined) {
files[fileIndex].filename = job.data.convertedFileName;
files[fileIndex].mimetype = "image/jpeg";
}
});
}
async function filterValidHeicFiles(files: Express.Multer.File[]) {
const validFiles = [];
for (const file of files) {
const type: FileTypeResult | undefined = await fileTypeFromFile(file.path);
if (type?.mime === "image/heic") {
validFiles.push(file);
// Add jobs and wait for completion of each before proceeding
for (const jobData of jobs) {
try {
const job = await heicQueue.add(jobData.name, jobData.data);
await job.waitUntilFinished(heicQueueEvents);
logger.debug(`Job ${job.id} finished successfully.`);
} catch (error) {
logger.error(`Job for ${jobData.data.fileInfo.originalFilename} failed:`, error);
// Depending on your error handling strategy you might rethrow or continue
}
}
return validFiles;
}
async function handleOriginalFile(fileInfo: { path: string; destination: string; originalFilename: string }) {
try {
if (process.env.KEEP_CONVERTED_ORIGINALS) {
await fs.ensureDir(path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir));
await fs.move(
fileInfo.path,
`${path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir)}/${fileInfo.originalFilename}`
);
} else {
await fs.unlink(fileInfo.path);
// Update original files list with new names, mimetype, and path
const filenameToIndex = new Map(files.map((f, i) => [f.filename, i]));
for (const { data } of jobs) {
const idx = filenameToIndex.get(data.fileInfo.originalFilename);
if (idx !== undefined) {
const oldPath = files[idx].path;
files[idx].filename = data.convertedFileName;
files[idx].mimetype = "image/jpeg";
files[idx].path = path.join(data.fileInfo.destination, data.convertedFileName);
logger.debug(`Updated file entry: ${data.fileInfo.originalFilename} -> ${data.convertedFileName}`, {
oldPath,
newPath: files[idx].path,
newMimetype: files[idx].mimetype
});
}
} catch (error) {
logger.log("error", `Error handling original file: ${error}`);
throw error;
}
}
async function ConvertToJpeg(file: string, newPath: string) {
// const fileOnDisk: Buffer = await fs.readFile(file);
// return new Promise<string>((resolve, reject) => {
// imageMagick(fileOnDisk)
// .setFormat("jpg")
// .write(newPath, (error) => {
// if (error) reject(error.message);
// resolve(newPath);
// });
// });
return new Promise<string>((resolve, reject) => {
const readStream = fs.createReadStream(file);
const writeStream = fs.createWriteStream(newPath);
imageMagick(readStream)
.setFormat("jpg")
.stream()
.pipe(writeStream)
.on("finish", () => resolve(newPath))
.on("error", (error) => reject(error.message));
});
}
const HeicWorker = new Worker(
"HEIC Queue",
async (job: Job) => {
// Worker processing HEIC conversion jobs
const heicWorker = new Worker(
QUEUE_NAME,
async (
job: Job<{
fileInfo: { path: string; destination: string; originalFilename: string };
convertedFileName: string;
}>
) => {
const { fileInfo, convertedFileName } = job.data;
try {
logger.log("debug", `Attempting to Convert ${fileInfo.originalFilename} image to JPEG from HEIC.`);
logger.debug(`Converting ${fileInfo.originalFilename} from HEIC to JPEG.`);
await job.updateProgress(10);
await ConvertToJpeg(fileInfo.path, `${fileInfo.destination}/${convertedFileName}`);
const outputPath = path.join(fileInfo.destination, convertedFileName);
await convertToJpeg(fileInfo.path, outputPath);
await job.updateProgress(50);
await handleOriginalFile(fileInfo);
logger.log("debug", `Converted ${fileInfo.originalFilename} image to JPEG from HEIC.`);
logger.debug(`Successfully converted ${fileInfo.originalFilename} to JPEG.`);
await job.updateProgress(100);
return true;
} catch (error) {
logger.log(
"error",
`QUEUE ERROR: Error converting ${fileInfo.originalFilename} image to JPEG from HEIC. ${JSON.stringify(error)}`
);
logger.error(`Error converting ${fileInfo.originalFilename}:`, error);
throw error;
}
},
{
connection: {
host: "localhost",
port: 6379,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
reconnectOnError: function (err) {
const targetError = "READONLY";
return err.message.includes(targetError);
}
},
connection: connectionOpts,
concurrency: 1
}
);
HeicQueue.on("waiting", (job) => {
logger.log("debug", `[BULLMQ] Job is waiting in queue! ${job.data.convertedFileName}`);
// Event listeners for queue and worker
heicQueue.on("waiting", (job) => {
logger.debug(`[heicQueue] Job waiting in queue: ${job.data.convertedFileName}`);
});
HeicQueue.on("error", (error) => {
logger.log("error", `[BULLMQ] Queue Error! ${error}`);
heicQueue.on("error", (error) => {
logger.error(`[heicQueue] Queue error:`, error);
});
HeicWorker.on("ready", () => {
logger.log("debug", `[BULLMQ] Worker Ready`);
heicWorker.on("ready", () => {
logger.debug("[heicWorker] Worker ready");
});
HeicWorker.on("active", (job, prev) => {
logger.log("debug", `[BULLMQ] Job ${job.id} is now active; previous status was ${prev}`);
heicWorker.on("active", (job, prev) => {
logger.debug(`[heicWorker] Job ${job.id} active (previous: ${prev})`);
});
HeicWorker.on("completed", async (job, returnvalue) => {
logger.log("debug", `[BULLMQ] ${job.id} has completed and returned ${returnvalue}`);
heicWorker.on("completed", async (job) => {
logger.debug(`[heicWorker] Job ${job.id} completed`);
await job.remove();
logger.log("debug", `Job ${job.id} removed from Redis`);
logger.debug(`Job ${job.id} removed from Redis`);
});
HeicWorker.on("failed", (jobId, failedReason) => {
logger.log("error", `[BULLMQ] ${jobId} has failed with reason ${failedReason}`);
heicWorker.on("failed", (jobId, reason) => {
logger.error(`[heicWorker] Job ${jobId} failed: ${reason}`);
});
HeicWorker.on("error", (error) => {
logger.log("error", `[BULLMQ] There was a queue error! ${error}`);
heicWorker.on("error", (error) => {
logger.error(`[heicWorker] Worker error:`, error);
});
HeicWorker.on("stalled", (error) => {
logger.log("error", `[BULLMQ] There was a worker stall! ${error}`);
heicWorker.on("stalled", (job) => {
logger.error(`[heicWorker] Worker stalled: ${job}`);
});
HeicWorker.on("ioredis:close", () => {
logger.log("error", `[BULLMQ] Redis connection closed!`);
heicWorker.on("ioredis:close", () => {
logger.error("[heicWorker] Redis connection closed");
});