Files
bodyshop-media-server/jobs/jobsUploadMedia.ts
2025-07-22 20:25:33 -07:00

223 lines
7.5 KiB
TypeScript

import { Job, Queue, QueueEvents, Worker } from "bullmq";
import dotenv from "dotenv";
import { Request, Response } from "express";
import fs from "fs-extra";
import multer from "multer";
import path from "path";
import { logger } from "../server.js";
import GenerateThumbnail from "../util/generateThumbnail.js";
import generateUniqueFilename from "../util/generateUniqueFilename.js";
import { convertHeicFiles } from "../util/heicConverter.js";
import { PathToRoFolder } from "../util/pathGenerators.js";
import { JobsListMedia } from "./jobsListMedia.js";
dotenv.config({
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
});
const UPLOAD_QUEUE_NAME = "uploadProcessingQueue";
const connectionOpts = {
host: "localhost",
port: 6379,
enableReadyCheck: true,
reconnectOnError: (err: Error) => err.message.includes("READONLY")
};
const uploadProcessingQueue = new Queue(UPLOAD_QUEUE_NAME, {
connection: connectionOpts,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: { type: "exponential", delay: 1000 }
}
});
const uploadQueueEvents = new QueueEvents(UPLOAD_QUEUE_NAME, {
connection: connectionOpts
});
// Upload processing worker
const uploadWorker = new Worker(
UPLOAD_QUEUE_NAME,
async (
job: Job<{
files: Express.Multer.File[];
jobid: string;
skipThumbnails: boolean;
}>
) => {
const { files, jobid, skipThumbnails } = job.data;
try {
logger.debug(`Processing upload for job ${jobid} with ${files.length} files, skipThumbnails: ${skipThumbnails}`);
await job.updateProgress(10);
// Convert HEIC files if any
await convertHeicFiles(files);
await job.updateProgress(40);
if (!skipThumbnails) {
// Log file states after conversion for debugging
logger.debug("File states after HEIC conversion:", files.map(f => ({
filename: f.filename,
mimetype: f.mimetype,
path: f.path
})));
// Check if converted files actually exist on disk
for (const file of files) {
const exists = await fs.pathExists(file.path);
logger.debug(`File existence check: ${file.filename} at ${file.path} - exists: ${exists}`);
}
// After HEIC conversion, all files should be ready for thumbnail generation
// Only exclude files that are still HEIC (which shouldn't happen after conversion)
const filesToThumbnail = [];
for (const file of files) {
const isStillHeic =
file.mimetype === "image/heic" ||
file.mimetype === "image/heif" ||
/\.heic$/i.test(file.filename) ||
/\.heif$/i.test(file.filename);
logger.debug(`File ${file.filename} - mimetype: ${file.mimetype}, isStillHeic: ${isStillHeic}, including: ${!isStillHeic}`);
if (!isStillHeic) {
filesToThumbnail.push(file);
}
}
logger.debug(
`Generating thumbnails for ${filesToThumbnail.length} files (excluding HEIC)`,
filesToThumbnail.map((f) => f.filename)
);
// Generate thumbnails
logger.debug(`About to generate thumbnails for ${filesToThumbnail.length} files`);
const thumbnailPromises = filesToThumbnail.map((file, index) => {
logger.debug(`Starting thumbnail generation for file ${index + 1}/${filesToThumbnail.length}: ${file.filename} at ${file.path}`);
return GenerateThumbnail(file.path);
});
const thumbnailResults = await Promise.all(thumbnailPromises);
logger.debug(`Thumbnail generation completed. Results:`, thumbnailResults);
await job.updateProgress(90);
}
logger.debug(`Upload processing completed for job ${jobid}`);
await job.updateProgress(100);
return { success: true, filesProcessed: files.length };
} catch (error) {
logger.error(`Error processing upload for job ${jobid}:`, error);
throw error;
}
},
{
connection: connectionOpts,
concurrency: 2 // Allow 2 upload processing jobs to run concurrently
}
);
// Event listeners for upload queue and worker
uploadProcessingQueue.on("waiting", (job) => {
logger.debug(`[UploadQueue] Job waiting: ${job.data.jobid}`);
});
uploadProcessingQueue.on("error", (error) => {
logger.error(`[UploadQueue] Queue error:`, error);
});
uploadWorker.on("ready", () => {
logger.debug("[UploadWorker] Worker ready");
});
uploadWorker.on("active", (job, prev) => {
logger.debug(`[UploadWorker] Job ${job.id} active (previous: ${prev})`);
});
uploadWorker.on("completed", async (job) => {
logger.debug(`[UploadWorker] Job ${job.id} completed`);
await job.remove();
logger.debug(`[UploadWorker] Job ${job.id} removed from Redis`);
});
uploadWorker.on("failed", (jobId, reason) => {
logger.error(`[UploadWorker] Job ${jobId} failed: ${reason}`);
});
uploadWorker.on("error", (error) => {
logger.error(`[UploadWorker] Worker error:`, error);
});
uploadWorker.on("stalled", (job) => {
logger.error(`[UploadWorker] Worker stalled: ${job}`);
});
uploadWorker.on("ioredis:close", () => {
logger.error("[UploadWorker] Redis connection closed");
});
export const JobMediaUploadMulter = multer({
storage: multer.diskStorage({
destination: function (req, file, cb) {
const jobid: string = (req.body.jobid || "").trim();
const DestinationFolder: string = PathToRoFolder(jobid);
fs.ensureDirSync(DestinationFolder);
cb(jobid === "" || jobid === null ? new Error("Job ID not specified.") : null, DestinationFolder);
},
filename: function (req, file, cb) {
logger.debug("Uploading file: ", {
file: path.basename(file.originalname)
});
cb(null, generateUniqueFilename(file));
}
})
});
export async function jobsUploadMedia(req: Request, res: Response) {
const jobid: string = (req.body.jobid || "").trim();
try {
const files = req.files as Express.Multer.File[] | undefined;
if (!files || files.length === 0) {
logger.warning("Upload contained no files.");
res.status(400).send({
status: false,
message: "No file uploaded"
});
return;
}
// Check if client wants to skip waiting for response but still do processing
if (req.body.skip_thumbnail) {
logger.debug("Client requested skip_thumbnail - responding immediately but processing in background");
// Set header to indicate we're skipping the response wait
req.headers.skipResponse = "true";
res.sendStatus(200);
// Continue with background processing (conversion + thumbnails)
const job = await uploadProcessingQueue.add("processUpload", {
files,
jobid,
skipThumbnails: false // Always do thumbnails, just don't wait for completion
});
return;
}
// Add upload processing job to queue and wait for completion
logger.debug(`Adding upload processing job for ${files.length} files with thumbnails enabled`);
const job = await uploadProcessingQueue.add("processUpload", {
files,
jobid,
skipThumbnails: false
});
// Wait for the job to complete
await job.waitUntilFinished(uploadQueueEvents);
logger.debug(`Upload processing job completed for job ${jobid}`);
// Return the updated media list
JobsListMedia(req, res);
} catch (error) {
logger.error("Error uploading job media.", {
jobid,
error: (error as Error).message
});
res.status(500).json((error as Error).message);
}
}