diff --git a/util/s3Sync.ts b/util/s3Sync.ts index 881b0f3..9cede37 100644 --- a/util/s3Sync.ts +++ b/util/s3Sync.ts @@ -7,6 +7,7 @@ import { logger } from "../server.js"; import { FolderPaths } from "./serverInit.js"; import axios from "axios"; import { UUID } from "crypto"; +import fsPromises from "fs/promises"; const execAsync = promisify(exec); @@ -183,8 +184,11 @@ export function createS3SyncFromEnv(): S3Sync | null { /** * Standalone function to analyze Jobs directory without S3 configuration + * Uses streaming to a temporary file to avoid memory issues with large datasets */ export async function analyzeJobsDirectory(): Promise { + const tempFilePath = path.join(FolderPaths.Jobs, "..", "jobs_analysis_temp.jsonl"); + try { logger.info("Starting Jobs directory analysis..."); //Read from the config.json file in the root directory to get the bodyshopid @@ -206,10 +210,14 @@ export async function analyzeJobsDirectory(): Promise { }; } + // Clear/create temporary file for streaming job stats + await fsPromises.writeFile(tempFilePath, "", "utf-8"); + logger.info(`Created temporary file for streaming analysis: ${tempFilePath}`); + const jobFolders = await readdir(jobsPath); - const jobStats: JobFolderStats[] = []; let total_documents = 0; let total_size_bytes = 0; + let total_jobs = 0; const aggregated_file_type_stats: { [extension: string]: number } = {}; const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; @@ -223,25 +231,47 @@ export async function analyzeJobsDirectory(): Promise { // Only process directories if (stat.isDirectory()) { const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid); - jobStats.push(folderStats); + + // Stream to file immediately (one JSON object per line) + await fsPromises.appendFile(tempFilePath, JSON.stringify(folderStats) + "\n", "utf-8"); + + total_jobs++; total_documents += folderStats.document_count; total_size_bytes += folderStats.total_size_bytes; + // Aggregate file type stats for (const [ext, count] of Object.entries(folderStats.file_type_stats)) { aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count; } + + // Log progress every 100 jobs + if (total_jobs % 100 === 0) { + logger.info(`Progress: Analyzed ${total_jobs} jobs...`); + } } } + logger.info(`Reading back job stats from temporary file...`); + + // Read back the job stats from file + const fileContent = await fsPromises.readFile(tempFilePath, "utf-8"); + const jobStats: JobFolderStats[] = fileContent + .trim() + .split("\n") + .filter((line) => line.length > 0) + .map((line) => JSON.parse(line)); + + // Sort by jobid + jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0); + const analysis: JobsDirectoryAnalysis = { bodyshopid, //read from the config.json file in the root directory - - total_jobs: jobStats.length, + total_jobs, total_documents, total_size_bytes, total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100, file_type_stats: aggregated_file_type_stats, - media_analytics_details: { data: jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0) } + media_analytics_details: { data: jobStats } }; logger.info( @@ -249,12 +279,27 @@ export async function analyzeJobsDirectory(): Promise { ); //Add an upload to the IO database to categorize all of this. - const apiURL = "https://api.test.imex.online/analytics/documents"; //TODO: don't hardcode and point to prod. + const apiURL = process.env.IS_TEST + ? "https://api.test.imex.online/analytics/documents" + : "https://api.imex.online/analytics/documents"; const result = await axios.post(apiURL, { data: analysis }); + // Clean up temporary file + await fsPromises.unlink(tempFilePath); + logger.info("Cleaned up temporary analysis file"); + return analysis; } catch (error) { logger.error("Failed to analyze Jobs directory:", JSON.stringify(error, null, 4)); + // Attempt to clean up temp file on error + try { + if (await fs.pathExists(tempFilePath)) { + await fs.remove(tempFilePath); + logger.info("Cleaned up temporary file after error"); + } + } catch (cleanupError) { + logger.warn("Failed to clean up temporary file:", cleanupError); + } throw error; } }