Refactor queue. Still only processing 1 job.

This commit is contained in:
Patrick Fic
2024-08-30 13:01:42 -07:00
parent b6066f9149
commit e7226b36d7
4 changed files with 127 additions and 73 deletions

View File

@@ -12,7 +12,7 @@ import { FolderPaths } from "./serverInit.js";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const HeicQueue = new Queue("HEIC Queue", { connection: { host: "localhost", port: 6379 } });
const HeicQueue = new Queue("HEIC Queue", { connection: { host: "localhost", port: 6379 , } });
dotenv.config({
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
@@ -23,23 +23,25 @@ const imageMagick = gm.subClass({ imageMagick: true });
export async function ConvertHeicFiles(files: Express.Multer.File[]) {
const validFiles = await filterValidHeicFiles(files);
await Promise.all(
validFiles.map(async (file) => {
const convertedFileName = generateUniqueHeicFilename(file);
await HeicQueue.add(convertedFileName, { convertedFileName, file });
const jobs = await HeicQueue.addBulk(validFiles.map(file => ({name: file.filename, data: {convertedFileName:generateUniqueHeicFilename(file) , file} })))
logger.log("debug", `The Jobs Object ${(JSON.stringify(jobs,null,2))}`)
// await Promise.all(
// validFiles.map(async (file) => {
// const convertedFileName = generateUniqueHeicFilename(file);
// await HeicQueue.add(convertedFileName, { convertedFileName, file },{removeOnComplete: true,});
// try {
// await ConvertToJpeg(file.path, `${file.destination}/${convertedFileName}`);
// logger.log("debug", `Converted ${file.filename} image to JPEG from HEIC.`);
// await handleOriginalFile(file, convertedFileName);
// file.filename = convertedFileName;
// file.mimetype = "image/jpeg";
// file.path = `${file.destination}/${convertedFileName}`;
// } catch (error) {
// logger.log("error", `Error converting ${file.filename} image to JPEG from HEIC. ${JSON.stringify(error)}`);
// }
})
);
// // try {
// // await ConvertToJpeg(file.path, `${file.destination}/${convertedFileName}`);
// // logger.log("debug", `Converted ${file.filename} image to JPEG from HEIC.`);
// // await handleOriginalFile(file, convertedFileName);
// // file.filename = convertedFileName;
// // file.mimetype = "image/jpeg";
// // file.path = `${file.destination}/${convertedFileName}`;
// // } catch (error) {
// // logger.log("error", `Error converting ${file.filename} image to JPEG from HEIC. ${JSON.stringify(error)}`);
// // }
// })
// );
}
async function filterValidHeicFiles(files: Express.Multer.File[]) {
@@ -85,30 +87,63 @@ const HeicWorker = new Worker(
async (job: Job) => {
const { file, convertedFileName } = job.data;
try {
logger.log("debug", `Attempting to Convert ${file.filename} image to JPEG from HEIC.`);
await ConvertToJpeg(file.path, `${file.destination}/${convertedFileName}`);
logger.log("debug", `Converted ${file.filename} image to JPEG from HEIC.`);
await handleOriginalFile(file, convertedFileName);
file.filename = convertedFileName;
file.mimetype = "image/jpeg";
file.path = `${file.destination}/${convertedFileName}`;
return true;
} catch (error) {
logger.log("error", `Error converting ${file.filename} image to JPEG from HEIC. ${JSON.stringify(error)}`);
logger.log("error", `QUEUE ERROR: Error converting ${file.filename} image to JPEG from HEIC. ${JSON.stringify(error)}`);
return false;
}
},
{
connection: { host: "localhost", port: 6379 }
connection: { host: "localhost", port: 6379 },
}
);
HeicQueue.on('waiting', job => {
logger.log("debug", `Job is waiting in queue! ${job.data.convertedFileName}`);
})
HeicQueue.on('error', error => {
logger.log("error", `Queue Error! ${error}`);
})
HeicWorker.on("ready", () => {
console.log(`Worker Ready`);
logger.log('debug',`Worker Ready`);
});
HeicWorker.on("active", (job, prev) => {
console.log(`Job ${job} is now active; previous status was ${prev}`);
logger.log('debug',`Job ${job.id} is now active; previous status was ${prev}`);
});
HeicWorker.on("completed", (jobId, returnvalue) => {
console.log(`${jobId} has completed and returned ${returnvalue}`);
logger.log('debug',`${jobId.id} has completed and returned ${returnvalue}`);
});
HeicWorker.on("failed", (jobId, failedReason) => {
console.log(`${jobId} has failed with reason ${failedReason}`);
logger.log('error',`${jobId} has failed with reason ${failedReason}`);
});
HeicWorker.on('error', error => {
logger.log('error', `There was a queue error! ${error}`)
})
HeicWorker.on('stalled', error => {
logger.log('error', `There was a worker stall! ${error}`)
})
HeicWorker.on('ioredis:close', () => {
logger.log('error', `Redis connection closed!`)
})
// const queueEvents = new QueueEvents( "HEIC Queue");
// queueEvents.on('completed', ( jobId, returnvalue ) => {
// // Called every time a job is completed by any worker.
// });
// queueEvents.on('failed', (jobId, failedReason ) => {
// // Called whenever a job is moved to failed by any worker.
// });
// queueEvents.on('progress', (jobId, data) => {
// // jobId received a progress event
// });