import { Job, Queue, QueueEvents, Worker } from "bullmq"; import { Request, Response } from "express"; import fs from "fs-extra"; import path from "path"; import { logger } from "../server.js"; import ListableChecker from "../util/listableChecker.js"; import { PathToRoBillsFolder, PathToRoFolder } from "../util/pathGenerators.js"; import { FolderPaths } from "../util/serverInit.js"; import { JobsListMedia } from "./jobsListMedia.js"; const MOVE_QUEUE_NAME = "moveQueue"; const connectionOpts = { host: "localhost", port: 6379, enableReadyCheck: true, reconnectOnError: (err: Error) => err.message.includes("READONLY") }; const moveQueue = new Queue(MOVE_QUEUE_NAME, { connection: connectionOpts, defaultJobOptions: { removeOnComplete: 10, removeOnFail: 5, attempts: 3, backoff: { type: "exponential", delay: 2000 } } }); const moveQueueEvents = new QueueEvents(MOVE_QUEUE_NAME, { connection: connectionOpts }); const moveWorker = new Worker( MOVE_QUEUE_NAME, async (job: Job<{ jobid: string; from_jobid: string; files: string[] }>) => { const { jobid, from_jobid, files } = job.data; logger.debug(`[MoveWorker] Starting move operation from ${from_jobid} to ${jobid} for ${files.length} files`); try { await job.updateProgress(5); const result = await processMoveOperation(jobid, from_jobid, files, job); await job.updateProgress(100); logger.debug(`[MoveWorker] Completed move operation from ${from_jobid} to ${jobid}`); return result; } catch (error) { logger.error(`[MoveWorker] Error moving files from ${from_jobid} to ${jobid}:`, error); throw error; } }, { connection: connectionOpts, concurrency: 2 // Limit concurrent move operations to avoid I/O overwhelm } ); // Worker event listeners for logging moveWorker.on("ready", () => { logger.debug("[MoveWorker] Worker is ready"); }); moveWorker.on("active", (job, prev) => { logger.debug(`[MoveWorker] Job ${job.id} active (previous: ${prev})`); }); moveWorker.on("completed", async (job) => { logger.debug(`[MoveWorker] Job ${job.id} completed`); }); moveWorker.on("failed", (job, err) => { logger.error(`[MoveWorker] Job ${job?.id} failed:`, err); }); moveWorker.on("stalled", (jobId) => { logger.error(`[MoveWorker] Job stalled: ${jobId}`); }); moveWorker.on("error", (err) => { logger.error("[MoveWorker] Worker error:", err); }); // Queue event listeners moveQueue.on("waiting", (job) => { logger.debug(`[MoveQueue] Job waiting in queue: ${job.data.from_jobid} -> ${job.data.jobid}`); }); moveQueue.on("error", (err) => { logger.error("[MoveQueue] Queue error:", err); }); async function processMoveOperation( jobid: string, from_jobid: string, files: string[], job?: Job ): Promise<{ moved: number; failed: number }> { try { const jobFileList: string[] = []; const jobDir = await fs.opendir(PathToRoFolder(from_jobid)); for await (const dirent of jobDir) { if (dirent.isFile() && ListableChecker(dirent)) jobFileList.push(dirent.name); } const billFileList: string[] = []; const billDir = await fs.opendir(PathToRoBillsFolder(from_jobid)); for await (const dirent of billDir) { if (dirent.isFile() && ListableChecker(dirent)) billFileList.push(dirent.name); } await fs.ensureDir(PathToRoFolder(jobid)); logger.debug("Moving job based media.", { jobid, from_jobid, files }); if (job) await job.updateProgress(15); const moveOps: Promise[] = []; let processedFiles = 0; const totalFiles = files.length; for (const file of files) { if (jobFileList.includes(file)) { // Move main file moveOps.push( fs .move(path.join(FolderPaths.Jobs, from_jobid, file), path.join(FolderPaths.Jobs, jobid, file), { overwrite: true }) .then(() => logger.debug(`[MoveWorker] Moved main file: ${file}`)) .catch((err) => logger.warn(`[MoveWorker] Failed to move main file ${file}:`, err)) ); // Move thumbnails const baseThumb = file.replace(/\.[^/.]+$/, ""); for (const ext of [".jpg", ".png"]) { moveOps.push( fs .move( path.join(FolderPaths.Jobs, from_jobid, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}`), path.join(FolderPaths.Jobs, jobid, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}`), { overwrite: true } ) .then(() => logger.debug(`[MoveWorker] Moved thumbnail: ${baseThumb}${ext}`)) .catch(() => {}) // Thumbnails might not exist ); } // Move ConvertedOriginal file if it exists const baseFileName = file.replace(/\.[^/.]+$/, ""); const sourceConvertedDir = path.join(FolderPaths.Jobs, from_jobid, FolderPaths.ConvertedOriginalSubDir); const targetConvertedDir = path.join(FolderPaths.Jobs, jobid, FolderPaths.ConvertedOriginalSubDir); moveOps.push( (async () => { try { if (await fs.pathExists(sourceConvertedDir)) { const convertedOriginalFiles = await fs.readdir(sourceConvertedDir); for (const convertedFile of convertedOriginalFiles) { const convertedFileBaseName = path.basename(convertedFile, path.extname(convertedFile)); if (convertedFileBaseName === baseFileName) { await fs.ensureDir(targetConvertedDir); await fs.move( path.join(sourceConvertedDir, convertedFile), path.join(targetConvertedDir, convertedFile), { overwrite: true } ); logger.debug(`[MoveWorker] Moved ConvertedOriginal: ${convertedFile}`); } } } } catch (error) { logger.warn(`[MoveWorker] Failed to move ConvertedOriginal for ${file}:`, error); } })() ); } if (billFileList.includes(file)) { // Move bill file moveOps.push( fs .move( path.join(FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, file), path.join(FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, file), { overwrite: true } ) .then(() => logger.debug(`[MoveWorker] Moved bill file: ${file}`)) .catch((err) => logger.warn(`[MoveWorker] Failed to move bill file ${file}:`, err)) ); // Move bill thumbnails const baseThumb = file.replace(/\.[^/.]+$/, ""); for (const ext of [".jpg", ".png"]) { moveOps.push( fs .move( path.join( FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}` ), path.join( FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}` ), { overwrite: true } ) .then(() => logger.debug(`[MoveWorker] Moved bill thumbnail: ${baseThumb}${ext}`)) .catch(() => {}) // Thumbnails might not exist ); } // Move bill ConvertedOriginal file if it exists const billBaseFileName = file.replace(/\.[^/.]+$/, ""); const sourceBillConvertedDir = path.join(FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, FolderPaths.ConvertedOriginalSubDir); const targetBillConvertedDir = path.join(FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, FolderPaths.ConvertedOriginalSubDir); moveOps.push( (async () => { try { if (await fs.pathExists(sourceBillConvertedDir)) { const convertedOriginalFiles = await fs.readdir(sourceBillConvertedDir); for (const convertedFile of convertedOriginalFiles) { const convertedFileBaseName = path.basename(convertedFile, path.extname(convertedFile)); if (convertedFileBaseName === billBaseFileName) { await fs.ensureDir(targetBillConvertedDir); await fs.move( path.join(sourceBillConvertedDir, convertedFile), path.join(targetBillConvertedDir, convertedFile), { overwrite: true } ); logger.debug(`[MoveWorker] Moved bill ConvertedOriginal: ${convertedFile}`); } } } } catch (error) { logger.warn(`[MoveWorker] Failed to move bill ConvertedOriginal for ${file}:`, error); } })() ); } processedFiles++; if (job) { const progress = 15 + Math.round((processedFiles / totalFiles) * 70); await job.updateProgress(progress); } } if (job) await job.updateProgress(90); const results = await Promise.allSettled(moveOps); const failed = results.filter(r => r.status === 'rejected').length; const moved = results.filter(r => r.status === 'fulfilled').length; logger.debug(`[MoveWorker] Move operation completed: ${moved} successful, ${failed} failed`); return { moved, failed }; } catch (error) { logger.error("[MoveWorker] Error in processMoveOperation:", error); throw error; } } export async function JobsMoveMedia(req: Request, res: Response) { const jobid: string = (req.body.jobid || "").trim(); const from_jobid: string = (req.body.from_jobid || "").trim(); const files: string[] = req.body.files || []; try { if (!from_jobid) { res.status(400).json({ error: "from_jobid must be specified." }); return; } if (!files.length) { res.status(400).json({ error: "files must be specified." }); return; } // For small operations (1-3 files), process synchronously for immediate feedback if (files.length <= 3) { logger.debug("Processing small move operation synchronously"); await processMoveOperation(jobid, from_jobid, files); JobsListMedia(req, res); return; } // For larger operations, use BullMQ but still return updated file list logger.debug(`[JobsMoveMedia] Queuing move operation for ${files.length} files`); const job = await moveQueue.add("moveMedia", { jobid, from_jobid, files }); // Return the updated file list immediately (optimistic update) JobsListMedia(req, res); // Process in background - if it fails, files will be back on next refresh job.waitUntilFinished(moveQueueEvents) .then(() => { logger.debug(`[JobsMoveMedia] Background move completed for job ${job.id}`); }) .catch((error) => { logger.error(`[JobsMoveMedia] Background move failed for job ${job.id}:`, error); }); } catch (err) { logger.error("Error moving job media", { from_jobid, jobid, files, err }); res.status(500).json({ error: "Failed to queue move operation", details: err }); } } /** * Get the status of a move operation job */ export async function JobsMoveStatus(req: Request, res: Response) { const { jobId } = req.params; try { const job = await Job.fromId(moveQueue, jobId); if (!job) { res.status(404).json({ error: "Job not found" }); return; } const state = await job.getState(); const progress = job.progress; res.json({ jobId, state, progress, data: job.data, finishedOn: job.finishedOn, processedOn: job.processedOn, failedReason: job.failedReason }); } catch (error) { logger.error("Error getting move job status:", error); res.status(500).json({ error: "Failed to get job status" }); } }