345 lines
12 KiB
TypeScript
345 lines
12 KiB
TypeScript
import { Job, Queue, QueueEvents, Worker } from "bullmq";
|
|
import { Request, Response } from "express";
|
|
import fs from "fs-extra";
|
|
import path from "path";
|
|
import { logger } from "../server.js";
|
|
import ListableChecker from "../util/listableChecker.js";
|
|
import { PathToRoBillsFolder, PathToRoFolder } from "../util/pathGenerators.js";
|
|
import { FolderPaths } from "../util/serverInit.js";
|
|
import { JobsListMedia } from "./jobsListMedia.js";
|
|
|
|
const MOVE_QUEUE_NAME = "moveQueue";
|
|
|
|
const connectionOpts = {
|
|
host: "localhost",
|
|
port: 6379,
|
|
enableReadyCheck: true,
|
|
reconnectOnError: (err: Error) => err.message.includes("READONLY")
|
|
};
|
|
|
|
const moveQueue = new Queue(MOVE_QUEUE_NAME, {
|
|
connection: connectionOpts,
|
|
defaultJobOptions: {
|
|
removeOnComplete: 10,
|
|
removeOnFail: 5,
|
|
attempts: 3,
|
|
backoff: { type: "exponential", delay: 2000 }
|
|
}
|
|
});
|
|
|
|
const moveQueueEvents = new QueueEvents(MOVE_QUEUE_NAME, {
|
|
connection: connectionOpts
|
|
});
|
|
|
|
const moveWorker = new Worker(
|
|
MOVE_QUEUE_NAME,
|
|
async (job: Job<{ jobid: string; from_jobid: string; files: string[] }>) => {
|
|
const { jobid, from_jobid, files } = job.data;
|
|
logger.debug(`[MoveWorker] Starting move operation from ${from_jobid} to ${jobid} for ${files.length} files`);
|
|
|
|
try {
|
|
await job.updateProgress(5);
|
|
|
|
const result = await processMoveOperation(jobid, from_jobid, files, job);
|
|
|
|
await job.updateProgress(100);
|
|
logger.debug(`[MoveWorker] Completed move operation from ${from_jobid} to ${jobid}`);
|
|
return result;
|
|
} catch (error) {
|
|
logger.error(`[MoveWorker] Error moving files from ${from_jobid} to ${jobid}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
{
|
|
connection: connectionOpts,
|
|
concurrency: 2 // Limit concurrent move operations to avoid I/O overwhelm
|
|
}
|
|
);
|
|
|
|
// Worker event listeners for logging
|
|
moveWorker.on("ready", () => {
|
|
logger.debug("[MoveWorker] Worker is ready");
|
|
});
|
|
moveWorker.on("active", (job, prev) => {
|
|
logger.debug(`[MoveWorker] Job ${job.id} active (previous: ${prev})`);
|
|
});
|
|
moveWorker.on("completed", async (job) => {
|
|
logger.debug(`[MoveWorker] Job ${job.id} completed`);
|
|
});
|
|
moveWorker.on("failed", (job, err) => {
|
|
logger.error(`[MoveWorker] Job ${job?.id} failed:`, err);
|
|
});
|
|
moveWorker.on("stalled", (jobId) => {
|
|
logger.error(`[MoveWorker] Job stalled: ${jobId}`);
|
|
});
|
|
moveWorker.on("error", (err) => {
|
|
logger.error("[MoveWorker] Worker error:", err);
|
|
});
|
|
|
|
// Queue event listeners
|
|
moveQueue.on("waiting", (job) => {
|
|
logger.debug(`[MoveQueue] Job waiting in queue: ${job.data.from_jobid} -> ${job.data.jobid}`);
|
|
});
|
|
moveQueue.on("error", (err) => {
|
|
logger.error("[MoveQueue] Queue error:", err);
|
|
});
|
|
|
|
async function processMoveOperation(
|
|
jobid: string,
|
|
from_jobid: string,
|
|
files: string[],
|
|
job?: Job
|
|
): Promise<{ moved: number; failed: number }> {
|
|
try {
|
|
const jobFileList: string[] = [];
|
|
const jobDir = await fs.opendir(PathToRoFolder(from_jobid));
|
|
for await (const dirent of jobDir) {
|
|
if (dirent.isFile() && ListableChecker(dirent)) jobFileList.push(dirent.name);
|
|
}
|
|
|
|
const billFileList: string[] = [];
|
|
const billDir = await fs.opendir(PathToRoBillsFolder(from_jobid));
|
|
for await (const dirent of billDir) {
|
|
if (dirent.isFile() && ListableChecker(dirent)) billFileList.push(dirent.name);
|
|
}
|
|
|
|
await fs.ensureDir(PathToRoFolder(jobid));
|
|
logger.debug("Moving job based media.", { jobid, from_jobid, files });
|
|
|
|
if (job) await job.updateProgress(15);
|
|
|
|
const moveOps: Promise<any>[] = [];
|
|
let processedFiles = 0;
|
|
const totalFiles = files.length;
|
|
|
|
for (const file of files) {
|
|
if (jobFileList.includes(file)) {
|
|
// Move main file
|
|
moveOps.push(
|
|
fs
|
|
.move(path.join(FolderPaths.Jobs, from_jobid, file), path.join(FolderPaths.Jobs, jobid, file), {
|
|
overwrite: true
|
|
})
|
|
.then(() => logger.debug(`[MoveWorker] Moved main file: ${file}`))
|
|
.catch((err) => logger.warning(`[MoveWorker] Failed to move main file ${file}:`, err))
|
|
);
|
|
|
|
// Move thumbnails
|
|
const baseThumb = file.replace(/\.[^/.]+$/, "");
|
|
for (const ext of [".jpg", ".png"]) {
|
|
moveOps.push(
|
|
fs
|
|
.move(
|
|
path.join(FolderPaths.Jobs, from_jobid, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}`),
|
|
path.join(FolderPaths.Jobs, jobid, FolderPaths.ThumbsSubDir, `${baseThumb}${ext}`),
|
|
{ overwrite: true }
|
|
)
|
|
.then(() => logger.debug(`[MoveWorker] Moved thumbnail: ${baseThumb}${ext}`))
|
|
.catch(() => {}) // Thumbnails might not exist
|
|
);
|
|
}
|
|
|
|
// Move ConvertedOriginal file if it exists
|
|
const baseFileName = file.replace(/\.[^/.]+$/, "");
|
|
const sourceConvertedDir = path.join(FolderPaths.Jobs, from_jobid, FolderPaths.ConvertedOriginalSubDir);
|
|
const targetConvertedDir = path.join(FolderPaths.Jobs, jobid, FolderPaths.ConvertedOriginalSubDir);
|
|
|
|
moveOps.push(
|
|
(async () => {
|
|
try {
|
|
if (await fs.pathExists(sourceConvertedDir)) {
|
|
const convertedOriginalFiles = await fs.readdir(sourceConvertedDir);
|
|
for (const convertedFile of convertedOriginalFiles) {
|
|
const convertedFileBaseName = path.basename(convertedFile, path.extname(convertedFile));
|
|
if (convertedFileBaseName === baseFileName) {
|
|
await fs.ensureDir(targetConvertedDir);
|
|
await fs.move(
|
|
path.join(sourceConvertedDir, convertedFile),
|
|
path.join(targetConvertedDir, convertedFile),
|
|
{ overwrite: true }
|
|
);
|
|
logger.debug(`[MoveWorker] Moved ConvertedOriginal: ${convertedFile}`);
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.warning(`[MoveWorker] Failed to move ConvertedOriginal for ${file}:`, error);
|
|
}
|
|
})()
|
|
);
|
|
}
|
|
|
|
if (billFileList.includes(file)) {
|
|
// Move bill file
|
|
moveOps.push(
|
|
fs
|
|
.move(
|
|
path.join(FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, file),
|
|
path.join(FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, file),
|
|
{ overwrite: true }
|
|
)
|
|
.then(() => logger.debug(`[MoveWorker] Moved bill file: ${file}`))
|
|
.catch((err) => logger.warning(`[MoveWorker] Failed to move bill file ${file}:`, err))
|
|
);
|
|
|
|
// Move bill thumbnails
|
|
const baseThumb = file.replace(/\.[^/.]+$/, "");
|
|
for (const ext of [".jpg", ".png"]) {
|
|
moveOps.push(
|
|
fs
|
|
.move(
|
|
path.join(
|
|
FolderPaths.Jobs,
|
|
from_jobid,
|
|
FolderPaths.BillsSubDir,
|
|
FolderPaths.ThumbsSubDir,
|
|
`${baseThumb}${ext}`
|
|
),
|
|
path.join(
|
|
FolderPaths.Jobs,
|
|
jobid,
|
|
FolderPaths.BillsSubDir,
|
|
FolderPaths.ThumbsSubDir,
|
|
`${baseThumb}${ext}`
|
|
),
|
|
{ overwrite: true }
|
|
)
|
|
.then(() => logger.debug(`[MoveWorker] Moved bill thumbnail: ${baseThumb}${ext}`))
|
|
.catch(() => {}) // Thumbnails might not exist
|
|
);
|
|
}
|
|
|
|
// Move bill ConvertedOriginal file if it exists
|
|
const billBaseFileName = file.replace(/\.[^/.]+$/, "");
|
|
const sourceBillConvertedDir = path.join(FolderPaths.Jobs, from_jobid, FolderPaths.BillsSubDir, FolderPaths.ConvertedOriginalSubDir);
|
|
const targetBillConvertedDir = path.join(FolderPaths.Jobs, jobid, FolderPaths.BillsSubDir, FolderPaths.ConvertedOriginalSubDir);
|
|
|
|
moveOps.push(
|
|
(async () => {
|
|
try {
|
|
if (await fs.pathExists(sourceBillConvertedDir)) {
|
|
const convertedOriginalFiles = await fs.readdir(sourceBillConvertedDir);
|
|
for (const convertedFile of convertedOriginalFiles) {
|
|
const convertedFileBaseName = path.basename(convertedFile, path.extname(convertedFile));
|
|
if (convertedFileBaseName === billBaseFileName) {
|
|
await fs.ensureDir(targetBillConvertedDir);
|
|
await fs.move(
|
|
path.join(sourceBillConvertedDir, convertedFile),
|
|
path.join(targetBillConvertedDir, convertedFile),
|
|
{ overwrite: true }
|
|
);
|
|
logger.debug(`[MoveWorker] Moved bill ConvertedOriginal: ${convertedFile}`);
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.warning(`[MoveWorker] Failed to move bill ConvertedOriginal for ${file}:`, error);
|
|
}
|
|
})()
|
|
);
|
|
}
|
|
|
|
processedFiles++;
|
|
if (job) {
|
|
const progress = 15 + Math.round((processedFiles / totalFiles) * 70);
|
|
await job.updateProgress(progress);
|
|
}
|
|
}
|
|
|
|
if (job) await job.updateProgress(90);
|
|
|
|
const results = await Promise.allSettled(moveOps);
|
|
const failed = results.filter(r => r.status === 'rejected').length;
|
|
const moved = results.filter(r => r.status === 'fulfilled').length;
|
|
|
|
logger.debug(`[MoveWorker] Move operation completed: ${moved} successful, ${failed} failed`);
|
|
|
|
return { moved, failed };
|
|
} catch (error) {
|
|
logger.error("[MoveWorker] Error in processMoveOperation:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
export async function JobsMoveMedia(req: Request, res: Response) {
|
|
const jobid: string = (req.body.jobid || "").trim();
|
|
const from_jobid: string = (req.body.from_jobid || "").trim();
|
|
const files: string[] = req.body.files || [];
|
|
|
|
try {
|
|
if (!from_jobid) {
|
|
res.status(400).json({ error: "from_jobid must be specified." });
|
|
return;
|
|
}
|
|
if (!files.length) {
|
|
res.status(400).json({ error: "files must be specified." });
|
|
return;
|
|
}
|
|
|
|
// For small operations (1-3 files), process synchronously for immediate feedback
|
|
if (files.length <= 3) {
|
|
logger.debug("Processing small move operation synchronously");
|
|
await processMoveOperation(jobid, from_jobid, files);
|
|
JobsListMedia(req, res);
|
|
return;
|
|
}
|
|
|
|
// For larger operations, use BullMQ but still return updated file list
|
|
logger.debug(`[JobsMoveMedia] Queuing move operation for ${files.length} files`);
|
|
const job = await moveQueue.add("moveMedia", { jobid, from_jobid, files });
|
|
|
|
// Return the updated file list immediately (optimistic update)
|
|
JobsListMedia(req, res);
|
|
|
|
// Process in background - if it fails, files will be back on next refresh
|
|
job.waitUntilFinished(moveQueueEvents)
|
|
.then(() => {
|
|
logger.debug(`[JobsMoveMedia] Background move completed for job ${job.id}`);
|
|
})
|
|
.catch((error) => {
|
|
logger.error(`[JobsMoveMedia] Background move failed for job ${job.id}:`, error);
|
|
});
|
|
|
|
} catch (err) {
|
|
logger.error("Error moving job media", {
|
|
from_jobid,
|
|
jobid,
|
|
files,
|
|
err
|
|
});
|
|
res.status(500).json({ error: "Failed to queue move operation", details: err });
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the status of a move operation job
|
|
*/
|
|
export async function JobsMoveStatus(req: Request, res: Response) {
|
|
const { jobId } = req.params;
|
|
|
|
try {
|
|
const job = await Job.fromId(moveQueue, jobId);
|
|
|
|
if (!job) {
|
|
res.status(404).json({ error: "Job not found" });
|
|
return;
|
|
}
|
|
|
|
const state = await job.getState();
|
|
const progress = job.progress;
|
|
|
|
res.json({
|
|
jobId,
|
|
state,
|
|
progress,
|
|
data: job.data,
|
|
finishedOn: job.finishedOn,
|
|
processedOn: job.processedOn,
|
|
failedReason: job.failedReason
|
|
});
|
|
} catch (error) {
|
|
logger.error("Error getting move job status:", error);
|
|
res.status(500).json({ error: "Failed to get job status" });
|
|
}
|
|
}
|