209 lines
6.3 KiB
TypeScript
209 lines
6.3 KiB
TypeScript
import { Job, Queue, Worker } from "bullmq";
|
|
import dotenv from "dotenv";
|
|
import { fileTypeFromFile } from "file-type";
|
|
import { FileTypeResult } from "file-type/core";
|
|
import fs from "fs-extra";
|
|
import gm from "gm";
|
|
import path from "path";
|
|
import { fileURLToPath } from "url";
|
|
import { logger } from "../server.js";
|
|
import { generateUniqueHeicFilename } from "./generateUniqueFilename.js";
|
|
import { FolderPaths } from "./serverInit.js";
|
|
|
|
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
|
|
|
const HeicQueue = new Queue("HEIC Queue", {
|
|
connection: {
|
|
host: "localhost",
|
|
port: 6379,
|
|
maxRetriesPerRequest: 3,
|
|
enableReadyCheck: true,
|
|
reconnectOnError: function (err) {
|
|
const targetError = "READONLY";
|
|
return err.message.includes(targetError);
|
|
}
|
|
},
|
|
defaultJobOptions: {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
attempts: 3,
|
|
backoff: {
|
|
type: "exponential",
|
|
delay: 1000
|
|
}
|
|
}
|
|
});
|
|
const cleanupINTERVAL = 1000 * 60 * 10;
|
|
setInterval(cleanupQueue, cleanupINTERVAL);
|
|
|
|
dotenv.config({
|
|
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
|
|
});
|
|
|
|
const imageMagick = gm.subClass({ imageMagick: true });
|
|
|
|
async function cleanupQueue() {
|
|
const ONE_HOUR = 1000 * 60 * 60;
|
|
const SIX_HOURS = ONE_HOUR * 6;
|
|
try {
|
|
// Clean completed jobs older than 1 hour
|
|
await HeicQueue.clean(ONE_HOUR, 500, "completed");
|
|
|
|
// Clean failed jobs older than 24 hours
|
|
await HeicQueue.clean(SIX_HOURS, 500, "failed");
|
|
|
|
// Get queue health
|
|
const jobCounts = await HeicQueue.getJobCounts();
|
|
logger.log("debug", `Queue status: ${JSON.stringify(jobCounts)}`);
|
|
} catch (error) {
|
|
logger.log("error", `Queue cleanup error: ${error}`);
|
|
}
|
|
}
|
|
|
|
export async function ConvertHeicFiles(files: Express.Multer.File[]) {
|
|
const validFiles = await filterValidHeicFiles(files);
|
|
|
|
const jobs = validFiles.map((file) => ({
|
|
name: file.filename,
|
|
data: {
|
|
convertedFileName: generateUniqueHeicFilename(file),
|
|
fileInfo: {
|
|
path: file.path,
|
|
destination: file.destination,
|
|
originalFilename: file.filename
|
|
}
|
|
}
|
|
}));
|
|
|
|
await HeicQueue.addBulk(jobs);
|
|
|
|
const fileMap = new Map(files.map((file, index) => [file.filename, index]));
|
|
jobs.forEach((job) => {
|
|
const fileIndex = fileMap.get(job.data.fileInfo.originalFilename);
|
|
if (fileIndex !== undefined) {
|
|
files[fileIndex].filename = job.data.convertedFileName;
|
|
files[fileIndex].mimetype = "image/jpeg";
|
|
}
|
|
});
|
|
}
|
|
|
|
async function filterValidHeicFiles(files: Express.Multer.File[]) {
|
|
const validFiles = [];
|
|
for (const file of files) {
|
|
const type: FileTypeResult | undefined = await fileTypeFromFile(file.path);
|
|
if (type?.mime === "image/heic") {
|
|
validFiles.push(file);
|
|
}
|
|
}
|
|
return validFiles;
|
|
}
|
|
|
|
async function handleOriginalFile(fileInfo: { path: string; destination: string; originalFilename: string }) {
|
|
try {
|
|
if (process.env.KEEP_CONVERTED_ORIGINALS) {
|
|
await fs.ensureDir(path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir));
|
|
await fs.move(
|
|
fileInfo.path,
|
|
`${path.join(fileInfo.destination, FolderPaths.ConvertedOriginalSubDir)}/${fileInfo.originalFilename}`
|
|
);
|
|
} else {
|
|
await fs.unlink(fileInfo.path);
|
|
}
|
|
} catch (error) {
|
|
logger.log("error", `Error handling original file: ${error}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async function ConvertToJpeg(file: string, newPath: string) {
|
|
// const fileOnDisk: Buffer = await fs.readFile(file);
|
|
|
|
// return new Promise<string>((resolve, reject) => {
|
|
// imageMagick(fileOnDisk)
|
|
// .setFormat("jpg")
|
|
// .write(newPath, (error) => {
|
|
// if (error) reject(error.message);
|
|
// resolve(newPath);
|
|
// });
|
|
// });
|
|
return new Promise<string>((resolve, reject) => {
|
|
const readStream = fs.createReadStream(file);
|
|
const writeStream = fs.createWriteStream(newPath);
|
|
|
|
imageMagick(readStream)
|
|
.setFormat("jpg")
|
|
.stream()
|
|
.pipe(writeStream)
|
|
.on("finish", () => resolve(newPath))
|
|
.on("error", (error) => reject(error.message));
|
|
});
|
|
}
|
|
|
|
const HeicWorker = new Worker(
|
|
"HEIC Queue",
|
|
async (job: Job) => {
|
|
const { fileInfo, convertedFileName } = job.data;
|
|
try {
|
|
logger.log("debug", `Attempting to Convert ${fileInfo.originalFilename} image to JPEG from HEIC.`);
|
|
await job.updateProgress(10);
|
|
await ConvertToJpeg(fileInfo.path, `${fileInfo.destination}/${convertedFileName}`);
|
|
await job.updateProgress(50);
|
|
await handleOriginalFile(fileInfo);
|
|
logger.log("debug", `Converted ${fileInfo.originalFilename} image to JPEG from HEIC.`);
|
|
await job.updateProgress(100);
|
|
|
|
return true;
|
|
} catch (error) {
|
|
logger.log(
|
|
"error",
|
|
`QUEUE ERROR: Error converting ${fileInfo.originalFilename} image to JPEG from HEIC. ${JSON.stringify(error)}`
|
|
);
|
|
throw error;
|
|
}
|
|
},
|
|
{
|
|
connection: {
|
|
host: "localhost",
|
|
port: 6379,
|
|
maxRetriesPerRequest: 3,
|
|
enableReadyCheck: true,
|
|
reconnectOnError: function (err) {
|
|
const targetError = "READONLY";
|
|
return err.message.includes(targetError);
|
|
}
|
|
},
|
|
concurrency: 1
|
|
}
|
|
);
|
|
|
|
HeicQueue.on("waiting", (job) => {
|
|
logger.log("debug", `[BULLMQ] Job is waiting in queue! ${job.data.convertedFileName}`);
|
|
});
|
|
HeicQueue.on("error", (error) => {
|
|
logger.log("error", `[BULLMQ] Queue Error! ${error}`);
|
|
});
|
|
|
|
HeicWorker.on("ready", () => {
|
|
logger.log("debug", `[BULLMQ] Worker Ready`);
|
|
});
|
|
HeicWorker.on("active", (job, prev) => {
|
|
logger.log("debug", `[BULLMQ] Job ${job.id} is now active; previous status was ${prev}`);
|
|
});
|
|
HeicWorker.on("completed", async (job, returnvalue) => {
|
|
logger.log("debug", `[BULLMQ] ${job.id} has completed and returned ${returnvalue}`);
|
|
await job.remove();
|
|
logger.log("debug", `Job ${job.id} removed from Redis`);
|
|
});
|
|
HeicWorker.on("failed", (jobId, failedReason) => {
|
|
logger.log("error", `[BULLMQ] ${jobId} has failed with reason ${failedReason}`);
|
|
});
|
|
HeicWorker.on("error", (error) => {
|
|
logger.log("error", `[BULLMQ] There was a queue error! ${error}`);
|
|
});
|
|
HeicWorker.on("stalled", (error) => {
|
|
logger.log("error", `[BULLMQ] There was a worker stall! ${error}`);
|
|
});
|
|
HeicWorker.on("ioredis:close", () => {
|
|
logger.log("error", `[BULLMQ] Redis connection closed!`);
|
|
});
|