Write progress to file to avoid memory leakes.
This commit is contained in:
@@ -7,6 +7,7 @@ import { logger } from "../server.js";
|
|||||||
import { FolderPaths } from "./serverInit.js";
|
import { FolderPaths } from "./serverInit.js";
|
||||||
import axios from "axios";
|
import axios from "axios";
|
||||||
import { UUID } from "crypto";
|
import { UUID } from "crypto";
|
||||||
|
import fsPromises from "fs/promises";
|
||||||
|
|
||||||
const execAsync = promisify(exec);
|
const execAsync = promisify(exec);
|
||||||
|
|
||||||
@@ -183,8 +184,11 @@ export function createS3SyncFromEnv(): S3Sync | null {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Standalone function to analyze Jobs directory without S3 configuration
|
* Standalone function to analyze Jobs directory without S3 configuration
|
||||||
|
* Uses streaming to a temporary file to avoid memory issues with large datasets
|
||||||
*/
|
*/
|
||||||
export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
||||||
|
const tempFilePath = path.join(FolderPaths.Jobs, "..", "jobs_analysis_temp.jsonl");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.info("Starting Jobs directory analysis...");
|
logger.info("Starting Jobs directory analysis...");
|
||||||
//Read from the config.json file in the root directory to get the bodyshopid
|
//Read from the config.json file in the root directory to get the bodyshopid
|
||||||
@@ -206,10 +210,14 @@ export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear/create temporary file for streaming job stats
|
||||||
|
await fsPromises.writeFile(tempFilePath, "", "utf-8");
|
||||||
|
logger.info(`Created temporary file for streaming analysis: ${tempFilePath}`);
|
||||||
|
|
||||||
const jobFolders = await readdir(jobsPath);
|
const jobFolders = await readdir(jobsPath);
|
||||||
const jobStats: JobFolderStats[] = [];
|
|
||||||
let total_documents = 0;
|
let total_documents = 0;
|
||||||
let total_size_bytes = 0;
|
let total_size_bytes = 0;
|
||||||
|
let total_jobs = 0;
|
||||||
const aggregated_file_type_stats: { [extension: string]: number } = {};
|
const aggregated_file_type_stats: { [extension: string]: number } = {};
|
||||||
|
|
||||||
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
|
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
|
||||||
@@ -223,25 +231,47 @@ export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
|||||||
// Only process directories
|
// Only process directories
|
||||||
if (stat.isDirectory()) {
|
if (stat.isDirectory()) {
|
||||||
const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid);
|
const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid);
|
||||||
jobStats.push(folderStats);
|
|
||||||
|
// Stream to file immediately (one JSON object per line)
|
||||||
|
await fsPromises.appendFile(tempFilePath, JSON.stringify(folderStats) + "\n", "utf-8");
|
||||||
|
|
||||||
|
total_jobs++;
|
||||||
total_documents += folderStats.document_count;
|
total_documents += folderStats.document_count;
|
||||||
total_size_bytes += folderStats.total_size_bytes;
|
total_size_bytes += folderStats.total_size_bytes;
|
||||||
|
|
||||||
// Aggregate file type stats
|
// Aggregate file type stats
|
||||||
for (const [ext, count] of Object.entries(folderStats.file_type_stats)) {
|
for (const [ext, count] of Object.entries(folderStats.file_type_stats)) {
|
||||||
aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count;
|
aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Log progress every 100 jobs
|
||||||
|
if (total_jobs % 100 === 0) {
|
||||||
|
logger.info(`Progress: Analyzed ${total_jobs} jobs...`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Reading back job stats from temporary file...`);
|
||||||
|
|
||||||
|
// Read back the job stats from file
|
||||||
|
const fileContent = await fsPromises.readFile(tempFilePath, "utf-8");
|
||||||
|
const jobStats: JobFolderStats[] = fileContent
|
||||||
|
.trim()
|
||||||
|
.split("\n")
|
||||||
|
.filter((line) => line.length > 0)
|
||||||
|
.map((line) => JSON.parse(line));
|
||||||
|
|
||||||
|
// Sort by jobid
|
||||||
|
jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0);
|
||||||
|
|
||||||
const analysis: JobsDirectoryAnalysis = {
|
const analysis: JobsDirectoryAnalysis = {
|
||||||
bodyshopid, //read from the config.json file in the root directory
|
bodyshopid, //read from the config.json file in the root directory
|
||||||
|
total_jobs,
|
||||||
total_jobs: jobStats.length,
|
|
||||||
total_documents,
|
total_documents,
|
||||||
total_size_bytes,
|
total_size_bytes,
|
||||||
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
|
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
|
||||||
file_type_stats: aggregated_file_type_stats,
|
file_type_stats: aggregated_file_type_stats,
|
||||||
media_analytics_details: { data: jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0) }
|
media_analytics_details: { data: jobStats }
|
||||||
};
|
};
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -249,12 +279,27 @@ export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
//Add an upload to the IO database to categorize all of this.
|
//Add an upload to the IO database to categorize all of this.
|
||||||
const apiURL = "https://api.test.imex.online/analytics/documents"; //TODO: don't hardcode and point to prod.
|
const apiURL = process.env.IS_TEST
|
||||||
|
? "https://api.test.imex.online/analytics/documents"
|
||||||
|
: "https://api.imex.online/analytics/documents";
|
||||||
const result = await axios.post(apiURL, { data: analysis });
|
const result = await axios.post(apiURL, { data: analysis });
|
||||||
|
|
||||||
|
// Clean up temporary file
|
||||||
|
await fsPromises.unlink(tempFilePath);
|
||||||
|
logger.info("Cleaned up temporary analysis file");
|
||||||
|
|
||||||
return analysis;
|
return analysis;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Failed to analyze Jobs directory:", JSON.stringify(error, null, 4));
|
logger.error("Failed to analyze Jobs directory:", JSON.stringify(error, null, 4));
|
||||||
|
// Attempt to clean up temp file on error
|
||||||
|
try {
|
||||||
|
if (await fs.pathExists(tempFilePath)) {
|
||||||
|
await fs.remove(tempFilePath);
|
||||||
|
logger.info("Cleaned up temporary file after error");
|
||||||
|
}
|
||||||
|
} catch (cleanupError) {
|
||||||
|
logger.warn("Failed to clean up temporary file:", cleanupError);
|
||||||
|
}
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user