223 lines
7.5 KiB
TypeScript
223 lines
7.5 KiB
TypeScript
import { Job, Queue, QueueEvents, Worker } from "bullmq";
|
|
import dotenv from "dotenv";
|
|
import { Request, Response } from "express";
|
|
import fs from "fs-extra";
|
|
import multer from "multer";
|
|
import path from "path";
|
|
import { logger } from "../server.js";
|
|
import GenerateThumbnail from "../util/generateThumbnail.js";
|
|
import generateUniqueFilename from "../util/generateUniqueFilename.js";
|
|
import { convertHeicFiles } from "../util/heicConverter.js";
|
|
import { PathToRoFolder } from "../util/pathGenerators.js";
|
|
import { JobsListMedia } from "./jobsListMedia.js";
|
|
|
|
dotenv.config({
|
|
path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`)
|
|
});
|
|
|
|
const UPLOAD_QUEUE_NAME = "uploadProcessingQueue";
|
|
|
|
const connectionOpts = {
|
|
host: "localhost",
|
|
port: 6379,
|
|
enableReadyCheck: true,
|
|
reconnectOnError: (err: Error) => err.message.includes("READONLY")
|
|
};
|
|
|
|
const uploadProcessingQueue = new Queue(UPLOAD_QUEUE_NAME, {
|
|
connection: connectionOpts,
|
|
defaultJobOptions: {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
attempts: 3,
|
|
backoff: { type: "exponential", delay: 1000 }
|
|
}
|
|
});
|
|
|
|
const uploadQueueEvents = new QueueEvents(UPLOAD_QUEUE_NAME, {
|
|
connection: connectionOpts
|
|
});
|
|
|
|
// Upload processing worker
|
|
const uploadWorker = new Worker(
|
|
UPLOAD_QUEUE_NAME,
|
|
async (
|
|
job: Job<{
|
|
files: Express.Multer.File[];
|
|
jobid: string;
|
|
skipThumbnails: boolean;
|
|
}>
|
|
) => {
|
|
const { files, jobid, skipThumbnails } = job.data;
|
|
try {
|
|
logger.debug(`Processing upload for job ${jobid} with ${files.length} files, skipThumbnails: ${skipThumbnails}`);
|
|
await job.updateProgress(10);
|
|
|
|
// Convert HEIC files if any
|
|
await convertHeicFiles(files);
|
|
await job.updateProgress(40);
|
|
|
|
if (!skipThumbnails) {
|
|
// Log file states after conversion for debugging
|
|
logger.debug("File states after HEIC conversion:", files.map(f => ({
|
|
filename: f.filename,
|
|
mimetype: f.mimetype,
|
|
path: f.path
|
|
})));
|
|
|
|
// Check if converted files actually exist on disk
|
|
for (const file of files) {
|
|
const exists = await fs.pathExists(file.path);
|
|
logger.debug(`File existence check: ${file.filename} at ${file.path} - exists: ${exists}`);
|
|
}
|
|
|
|
// After HEIC conversion, all files should be ready for thumbnail generation
|
|
// Only exclude files that are still HEIC (which shouldn't happen after conversion)
|
|
const filesToThumbnail = [];
|
|
for (const file of files) {
|
|
const isStillHeic =
|
|
file.mimetype === "image/heic" ||
|
|
file.mimetype === "image/heif" ||
|
|
/\.heic$/i.test(file.filename) ||
|
|
/\.heif$/i.test(file.filename);
|
|
|
|
logger.debug(`File ${file.filename} - mimetype: ${file.mimetype}, isStillHeic: ${isStillHeic}, including: ${!isStillHeic}`);
|
|
|
|
if (!isStillHeic) {
|
|
filesToThumbnail.push(file);
|
|
}
|
|
}
|
|
|
|
logger.debug(
|
|
`Generating thumbnails for ${filesToThumbnail.length} files (excluding HEIC)`,
|
|
filesToThumbnail.map((f) => f.filename)
|
|
);
|
|
|
|
// Generate thumbnails
|
|
logger.debug(`About to generate thumbnails for ${filesToThumbnail.length} files`);
|
|
const thumbnailPromises = filesToThumbnail.map((file, index) => {
|
|
logger.debug(`Starting thumbnail generation for file ${index + 1}/${filesToThumbnail.length}: ${file.filename} at ${file.path}`);
|
|
return GenerateThumbnail(file.path);
|
|
});
|
|
const thumbnailResults = await Promise.all(thumbnailPromises);
|
|
logger.debug(`Thumbnail generation completed. Results:`, thumbnailResults);
|
|
await job.updateProgress(90);
|
|
}
|
|
|
|
logger.debug(`Upload processing completed for job ${jobid}`);
|
|
await job.updateProgress(100);
|
|
|
|
return { success: true, filesProcessed: files.length };
|
|
} catch (error) {
|
|
logger.error(`Error processing upload for job ${jobid}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
{
|
|
connection: connectionOpts,
|
|
concurrency: 2 // Allow 2 upload processing jobs to run concurrently
|
|
}
|
|
);
|
|
|
|
// Event listeners for upload queue and worker
|
|
uploadProcessingQueue.on("waiting", (job) => {
|
|
logger.debug(`[UploadQueue] Job waiting: ${job.data.jobid}`);
|
|
});
|
|
uploadProcessingQueue.on("error", (error) => {
|
|
logger.error(`[UploadQueue] Queue error:`, error);
|
|
});
|
|
|
|
uploadWorker.on("ready", () => {
|
|
logger.debug("[UploadWorker] Worker ready");
|
|
});
|
|
uploadWorker.on("active", (job, prev) => {
|
|
logger.debug(`[UploadWorker] Job ${job.id} active (previous: ${prev})`);
|
|
});
|
|
uploadWorker.on("completed", async (job) => {
|
|
logger.debug(`[UploadWorker] Job ${job.id} completed`);
|
|
await job.remove();
|
|
logger.debug(`[UploadWorker] Job ${job.id} removed from Redis`);
|
|
});
|
|
uploadWorker.on("failed", (jobId, reason) => {
|
|
logger.error(`[UploadWorker] Job ${jobId} failed: ${reason}`);
|
|
});
|
|
uploadWorker.on("error", (error) => {
|
|
logger.error(`[UploadWorker] Worker error:`, error);
|
|
});
|
|
uploadWorker.on("stalled", (job) => {
|
|
logger.error(`[UploadWorker] Worker stalled: ${job}`);
|
|
});
|
|
uploadWorker.on("ioredis:close", () => {
|
|
logger.error("[UploadWorker] Redis connection closed");
|
|
});
|
|
|
|
export const JobMediaUploadMulter = multer({
|
|
storage: multer.diskStorage({
|
|
destination: function (req, file, cb) {
|
|
const jobid: string = (req.body.jobid || "").trim();
|
|
const DestinationFolder: string = PathToRoFolder(jobid);
|
|
fs.ensureDirSync(DestinationFolder);
|
|
cb(jobid === "" || jobid === null ? new Error("Job ID not specified.") : null, DestinationFolder);
|
|
},
|
|
filename: function (req, file, cb) {
|
|
logger.debug("Uploading file: ", {
|
|
file: path.basename(file.originalname)
|
|
});
|
|
cb(null, generateUniqueFilename(file));
|
|
}
|
|
})
|
|
});
|
|
|
|
export async function jobsUploadMedia(req: Request, res: Response) {
|
|
const jobid: string = (req.body.jobid || "").trim();
|
|
|
|
try {
|
|
const files = req.files as Express.Multer.File[] | undefined;
|
|
if (!files || files.length === 0) {
|
|
logger.warning("Upload contained no files.");
|
|
res.status(400).send({
|
|
status: false,
|
|
message: "No file uploaded"
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Check if client wants to skip waiting for response but still do processing
|
|
if (req.body.skip_thumbnail) {
|
|
logger.debug("Client requested skip_thumbnail - responding immediately but processing in background");
|
|
// Set header to indicate we're skipping the response wait
|
|
req.headers.skipResponse = "true";
|
|
res.sendStatus(200);
|
|
|
|
// Continue with background processing (conversion + thumbnails)
|
|
const job = await uploadProcessingQueue.add("processUpload", {
|
|
files,
|
|
jobid,
|
|
skipThumbnails: false // Always do thumbnails, just don't wait for completion
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Add upload processing job to queue and wait for completion
|
|
logger.debug(`Adding upload processing job for ${files.length} files with thumbnails enabled`);
|
|
const job = await uploadProcessingQueue.add("processUpload", {
|
|
files,
|
|
jobid,
|
|
skipThumbnails: false
|
|
});
|
|
|
|
// Wait for the job to complete
|
|
await job.waitUntilFinished(uploadQueueEvents);
|
|
logger.debug(`Upload processing job completed for job ${jobid}`);
|
|
|
|
// Return the updated media list
|
|
JobsListMedia(req, res);
|
|
} catch (error) {
|
|
logger.error("Error uploading job media.", {
|
|
jobid,
|
|
error: (error as Error).message
|
|
});
|
|
res.status(500).json((error as Error).message);
|
|
}
|
|
}
|