Files
bodyshop-media-server/jobs/jobsMoveMedia.ts
2025-07-23 17:59:13 -07:00

345 lines
12 KiB
TypeScript

import { Job, Queue, QueueEvents, Worker } from "bullmq";
import { Request, Response } from "express";
import fs from "fs-extra";
import path from "path";
import { logger } from "../server.js";
import ListableChecker from "../util/listableChecker.js";
import { PathToRoBillsFolder, PathToRoFolder } from "../util/pathGenerators.js";
import { FolderPaths } from "../util/serverInit.js";
import { JobsListMedia } from "./jobsListMedia.js";
const MOVE_QUEUE_NAME = "moveQueue";
const connectionOpts = {
host: "localhost",
port: 6379,
enableReadyCheck: true,
reconnectOnError: (err: Error) => err.message.includes("READONLY")
};
const moveQueue = new Queue(MOVE_QUEUE_NAME, {
connection: connectionOpts,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: { type: "exponential", delay: 2000 }
}
});
const moveQueueEvents = new QueueEvents(MOVE_QUEUE_NAME, {
connection: connectionOpts
});
const moveWorker = new Worker(
MOVE_QUEUE_NAME,
async (job: Job<{ jobid: string; from_jobid: string; files: string[] }>) => {
const { jobid, from_jobid, files } = job.data;
logger.debug(`[MoveWorker] Starting move operation from ${from_jobid} to ${jobid} for ${files.length} files`);
try {
await job.updateProgress(5);
const result = await processMoveOperation(jobid, from_jobid, files, job);
await job.updateProgress(100);
logger.debug(`[MoveWorker] Completed move operation from ${from_jobid} to ${jobid}`);
return result;
} catch (error) {
logger.error(`[MoveWorker] Error moving files from ${from_jobid} to ${jobid}:`, error);
throw error;
}
},
{
connection: connectionOpts,
concurrency: 2 // Limit concurrent move operations to avoid I/O overwhelm
}
);
// Worker event listeners for logging
moveWorker.on("ready", () => {
logger.debug("[MoveWorker] Worker is ready");
});
moveWorker.on("active", (job, prev) => {
logger.debug(`[MoveWorker] Job ${job.id} active (previous: ${prev})`);
});
moveWorker.on("completed", async (job) => {
logger.debug(`[MoveWorker] Job ${job.id} completed`);
});
moveWorker.on("failed", (job, err) => {
logger.error(`[MoveWorker] Job ${job?.id} failed:`, err);
});
moveWorker.on("stalled", (jobId) => {
logger.error(`[MoveWorker] Job stalled: ${jobId}`);
});
moveWorker.on("error", (err) => {
logger.error("[MoveWorker] Worker error:", err);
});
// Queue event listeners
moveQueue.on("waiting", (job) => {
logger.debug(`[MoveQueue] Job waiting in queue: ${job.data.from_jobid} -> ${job.data.jobid}`);
});
moveQueue.on("error", (err) => {
logger.error("[MoveQueue] Queue error:", err);
});
async function processMoveOperation(
jobid: string,
from_jobid: string,
files: string[],
job?: Job
): Promise<{ moved: number; failed: number }> {
try {
const jobFileList: string[] = [];
const jobDir = await fs.opendir(PathToRoFolder(from_jobid));
for await (const dirent of jobDir) {
if (dirent.isFile() && ListableChecker(dirent)) jobFileList.push(dirent.name);
}
const billFileList: string[] = [];
const billDir = await fs.opendir(PathToRoBillsFolder(from_jobid));
for await (const dirent of billDir) {
if (dirent.isFile() && ListableChecker(dirent)) billFileList.push(dirent.name);
}
await fs.ensureDir(PathToRoFolder(jobid));
logger.debug("Moving job based media.", { jobid, from_jobid, files });
if (job) await job.updateProgress(15);
const moveOps: Promise<any>[] = [];
let processedFiles = 0;
const totalFiles = files.length;
for (const file of files) {
if (jobFileList.includes(file)) {
// Move main file
moveOps.push(
fs
.move(path.join(FolderPaths.Jobs, from_jobid, file), path.join(FolderPaths.Jobs, jobid, file), {
overwrite: true
})
.then(() => logger.debug(`[MoveWorker] Moved main file: ${file}`))
.catch((err) => logger.warning(`[MoveWorker] Failed to move main file ${file}:`, err))
);
// Move thumbnails
const baseThumb = file.replace(/\.[^/.]+$/, "");
for (const ext of [".jpg", ".png"]) {
moveOps.push(
fs
.move(
path.join(FolderPaths.Jobs, from_jobid, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}`),
path.join(FolderPaths.Jobs, jobid, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}`),
{ overwrite: true }
)
.then(() => logger.debug(`[MoveWorker] Moved thumbnail: ${baseThumb}${ext}`))
.catch(() => {}) // Thumbnails might not exist
);
}
// Move ConvertedOriginal file if it exists
const baseFileName = file.replace(/\.[^/.]+$/, "");
const sourceConvertedDir = path.join(FolderPaths.Jobs, from_jobid, FolderPaths.ConvertedOriginalSubDir);
const targetConvertedDir = path.join(FolderPaths.Jobs, jobid, FolderPaths.ConvertedOriginalSubDir);
moveOps.push(
(async () => {
try {
if (await fs.pathExists(sourceConvertedDir)) {
const convertedOriginalFiles = await fs.readdir(sourceConvertedDir);
for (const convertedFile of convertedOriginalFiles) {
const convertedFileBaseName = path.basename(convertedFile, path.extname(convertedFile));
if (convertedFileBaseName === baseFileName) {
await fs.ensureDir(targetConvertedDir);
await fs.move(
path.join(sourceConvertedDir, convertedFile),
path.join(targetConvertedDir, convertedFile),
{ overwrite: true }
);
logger.debug(`[MoveWorker] Moved ConvertedOriginal: ${convertedFile}`);
}
}
}
} catch (error) {
logger.warning(`[MoveWorker] Failed to move ConvertedOriginal for ${file}:`, error);
}
})()
);
}
if (billFileList.includes(file)) {
// Move bill file
moveOps.push(
fs
.move(
path.join(FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, file),
path.join(FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, file),
{ overwrite: true }
)
.then(() => logger.debug(`[MoveWorker] Moved bill file: ${file}`))
.catch((err) => logger.warning(`[MoveWorker] Failed to move bill file ${file}:`, err))
);
// Move bill thumbnails
const baseThumb = file.replace(/\.[^/.]+$/, "");
for (const ext of [".jpg", ".png"]) {
moveOps.push(
fs
.move(
path.join(
FolderPaths.Jobs,
from_jobid,
FolderPaths.BillsSubDir,
FolderPaths.ThumbsSubDir,
`${baseThumb}${ext}`
),
path.join(
FolderPaths.Jobs,
jobid,
FolderPaths.BillsSubDir,
FolderPaths.ThumbsSubDir,
`${baseThumb}${ext}`
),
{ overwrite: true }
)
.then(() => logger.debug(`[MoveWorker] Moved bill thumbnail: ${baseThumb}${ext}`))
.catch(() => {}) // Thumbnails might not exist
);
}
// Move bill ConvertedOriginal file if it exists
const billBaseFileName = file.replace(/\.[^/.]+$/, "");
const sourceBillConvertedDir = path.join(FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, FolderPaths.ConvertedOriginalSubDir);
const targetBillConvertedDir = path.join(FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, FolderPaths.ConvertedOriginalSubDir);
moveOps.push(
(async () => {
try {
if (await fs.pathExists(sourceBillConvertedDir)) {
const convertedOriginalFiles = await fs.readdir(sourceBillConvertedDir);
for (const convertedFile of convertedOriginalFiles) {
const convertedFileBaseName = path.basename(convertedFile, path.extname(convertedFile));
if (convertedFileBaseName === billBaseFileName) {
await fs.ensureDir(targetBillConvertedDir);
await fs.move(
path.join(sourceBillConvertedDir, convertedFile),
path.join(targetBillConvertedDir, convertedFile),
{ overwrite: true }
);
logger.debug(`[MoveWorker] Moved bill ConvertedOriginal: ${convertedFile}`);
}
}
}
} catch (error) {
logger.warning(`[MoveWorker] Failed to move bill ConvertedOriginal for ${file}:`, error);
}
})()
);
}
processedFiles++;
if (job) {
const progress = 15 + Math.round((processedFiles / totalFiles) * 70);
await job.updateProgress(progress);
}
}
if (job) await job.updateProgress(90);
const results = await Promise.allSettled(moveOps);
const failed = results.filter(r => r.status === 'rejected').length;
const moved = results.filter(r => r.status === 'fulfilled').length;
logger.debug(`[MoveWorker] Move operation completed: ${moved} successful, ${failed} failed`);
return { moved, failed };
} catch (error) {
logger.error("[MoveWorker] Error in processMoveOperation:", error);
throw error;
}
}
export async function JobsMoveMedia(req: Request, res: Response) {
const jobid: string = (req.body.jobid || "").trim();
const from_jobid: string = (req.body.from_jobid || "").trim();
const files: string[] = req.body.files || [];
try {
if (!from_jobid) {
res.status(400).json({ error: "from_jobid must be specified." });
return;
}
if (!files.length) {
res.status(400).json({ error: "files must be specified." });
return;
}
// For small operations (1-3 files), process synchronously for immediate feedback
if (files.length <= 3) {
logger.debug("Processing small move operation synchronously");
await processMoveOperation(jobid, from_jobid, files);
JobsListMedia(req, res);
return;
}
// For larger operations, use BullMQ but still return updated file list
logger.debug(`[JobsMoveMedia] Queuing move operation for ${files.length} files`);
const job = await moveQueue.add("moveMedia", { jobid, from_jobid, files });
// Return the updated file list immediately (optimistic update)
JobsListMedia(req, res);
// Process in background - if it fails, files will be back on next refresh
job.waitUntilFinished(moveQueueEvents)
.then(() => {
logger.debug(`[JobsMoveMedia] Background move completed for job ${job.id}`);
})
.catch((error) => {
logger.error(`[JobsMoveMedia] Background move failed for job ${job.id}:`, error);
});
} catch (err) {
logger.error("Error moving job media", {
from_jobid,
jobid,
files,
err
});
res.status(500).json({ error: "Failed to queue move operation", details: err });
}
}
/**
* Get the status of a move operation job
*/
export async function JobsMoveStatus(req: Request, res: Response) {
const { jobId } = req.params;
try {
const job = await Job.fromId(moveQueue, jobId);
if (!job) {
res.status(404).json({ error: "Job not found" });
return;
}
const state = await job.getState();
const progress = job.progress;
res.json({
jobId,
state,
progress,
data: job.data,
finishedOn: job.finishedOn,
processedOn: job.processedOn,
failedReason: job.failedReason
});
} catch (error) {
logger.error("Error getting move job status:", error);
res.status(500).json({ error: "Failed to get job status" });
}
}