import { exec } from "child_process"; import { promisify } from "util"; import * as fs from "fs-extra"; import { readdir, stat as fsStat } from "fs/promises"; import * as path from "path"; import { logger } from "../server.js"; import { FolderPaths } from "./serverInit.js"; import axios from "axios"; import { UUID } from "crypto"; import fsPromises from "fs/promises"; import crypto from "crypto"; import { createReadStream } from "fs"; const execAsync = promisify(exec); interface S3SyncConfig { bucketName: string; region: string; accessKeyId: string; secretAccessKey: string; keyPrefix?: string; } export interface JobFolderStats { bodyshopid: UUID; jobid: UUID | string | null; //relativePath: string; document_count: number; unique_document_count: number; duplicate_count: number; total_size_bytes: number; total_size_mb: number; file_type_stats: { [extension: string]: number }; } export interface JobsDirectoryAnalysis { bodyshopid: UUID; total_jobs: number; total_documents: number; unique_documents: number; duplicate_documents: number; total_size_bytes: number; total_size_mb: number; file_type_stats: { [extension: string]: number }; media_analytics_details: { data: JobFolderStats[]; }; } export class S3Sync { private config: S3SyncConfig; constructor(config: S3SyncConfig) { this.config = config; } /** * Sync the Jobs directory to S3 bucket using AWS CLI */ async syncJobsToS3(): Promise { try { logger.info("Starting S3 sync for Jobs directory using AWS CLI..."); const jobsPath = FolderPaths.Jobs; const keyPrefix = this.config.keyPrefix || "jobs/"; const s3Path = `s3://${this.config.bucketName}/${keyPrefix}`; // Check if Jobs directory exists if (!(await fs.pathExists(jobsPath))) { logger.error(`Jobs directory does not exist: ${jobsPath}`); return; } // Check if AWS CLI is available await this.checkAwsCli(); // Set AWS credentials as environment variables for the command const env = { ...process.env, AWS_ACCESS_KEY_ID: this.config.accessKeyId, AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey, AWS_DEFAULT_REGION: this.config.region }; // Run AWS S3 sync command const syncCommand = `aws s3 sync "${jobsPath}" "${s3Path}"`; logger.info(`Executing AWS S3 sync command`); const { stdout, stderr } = await execAsync(syncCommand, { env, maxBuffer: 1024 * 1024 * 10 // 10MB buffer for large sync outputs }); if (stdout) { logger.info("S3 sync output:", stdout); } if (stderr) { logger.warning("S3 sync warnings:", stderr); } logger.info("S3 sync completed successfully using AWS CLI"); } catch (error) { logger.error("S3 sync failed:", error); throw error; } } /** * Check if AWS CLI is available */ private async checkAwsCli(): Promise { try { await execAsync("aws --version"); } catch (error) { throw new Error( "AWS CLI not found. Please install AWS CLI: https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html" ); } } /** * Test S3 connection using AWS CLI */ async testConnection(): Promise { try { // Check if AWS CLI is available first await this.checkAwsCli(); // Set AWS credentials as environment variables const env = { ...process.env, AWS_ACCESS_KEY_ID: this.config.accessKeyId, AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey, AWS_DEFAULT_REGION: this.config.region }; // Test connection by listing bucket const testCommand = `aws s3 ls s3://${this.config.bucketName} --max-items 1`; await execAsync(testCommand, { env }); logger.info(`S3 connection test successful for bucket: ${this.config.bucketName}`); return true; } catch (error) { logger.error("S3 connection test failed:", error); return false; } } /** * Get sync statistics using AWS CLI */ async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> { const available = false; //await this.testConnection(); return { bucketName: this.config.bucketName, region: this.config.region, keyPrefix: this.config.keyPrefix || "jobs/", available }; } } /** * Create S3Sync instance from environment variables */ export function createS3SyncFromEnv(): S3Sync | null { const bucketName = process.env.S3_BUCKET_NAME || "test"; const region = process.env.S3_REGION || "ca-central-1"; const accessKeyId = process.env.S3_ACCESS_KEY_ID || "key"; const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY || "secret"; const keyPrefix = process.env.S3_KEY_PREFIX || "prefix"; if (!bucketName || !accessKeyId || !secretAccessKey) { logger.warning( "S3 configuration incomplete. Required env vars: S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY" ); return null; } return new S3Sync({ bucketName, region, accessKeyId, secretAccessKey, keyPrefix }); } /** * Standalone function to analyze Jobs directory without S3 configuration * Uses streaming to a temporary file to avoid memory issues with large datasets */ export async function analyzeJobsDirectory(): Promise { const tempFilePath = path.join(FolderPaths.Jobs, "..", "jobs_analysis_temp.jsonl"); try { logger.info("Starting Jobs directory analysis..."); //Read from the config.json file in the root directory to get the bodyshopid const bodyshopid: UUID = await getBodyshopIdFromConfig(); const jobsPath = FolderPaths.Jobs; // Check if Jobs directory exists if (!(await fs.pathExists(jobsPath))) { logger.warning(`Jobs directory does not exist: ${jobsPath}`); return { bodyshopid, total_jobs: 0, total_documents: 0, unique_documents: 0, duplicate_documents: 0, total_size_bytes: 0, total_size_mb: 0, file_type_stats: {}, media_analytics_details: { data: [] } }; } // Clear/create temporary file for streaming job stats await fsPromises.writeFile(tempFilePath, "", "utf-8"); logger.info(`Created temporary file for streaming analysis: ${tempFilePath}`); const jobFolders = await readdir(jobsPath); let total_documents = 0; let total_unique_documents = 0; let total_duplicate_documents = 0; let total_size_bytes = 0; let total_jobs = 0; const aggregated_file_type_stats: { [extension: string]: number } = {}; const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; for (const jobFolder of jobFolders) { if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) { logger.info(`Skipping invalid jobid directory: ${jobFolder}`); continue; } const jobFolderPath = path.join(jobsPath, jobFolder); const stat = await fsStat(jobFolderPath); // Only process directories if (stat.isDirectory()) { const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid); // Stream to file immediately (one JSON object per line) await fsPromises.appendFile(tempFilePath, JSON.stringify(folderStats) + "\n", "utf-8"); total_jobs++; total_documents += folderStats.document_count; total_unique_documents += folderStats.unique_document_count; total_duplicate_documents += folderStats.duplicate_count; total_size_bytes += folderStats.total_size_bytes; // Aggregate file type stats for (const [ext, count] of Object.entries(folderStats.file_type_stats)) { aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count; } // Log progress every 100 jobs if (total_jobs % 100 === 0) { logger.info(`Progress: Analyzed ${total_jobs} jobs...`); } } } logger.info(`Reading back job stats from temporary file...`); // Read back the job stats from file const fileContent = await fsPromises.readFile(tempFilePath, "utf-8"); const jobStats: JobFolderStats[] = fileContent .trim() .split("\n") .filter((line) => line.length > 0) .map((line) => JSON.parse(line)); // Sort by jobid jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0); const analysis: JobsDirectoryAnalysis = { bodyshopid, //read from the config.json file in the root directory total_jobs, total_documents, unique_documents: total_unique_documents, duplicate_documents: total_duplicate_documents, total_size_bytes, total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100, file_type_stats: aggregated_file_type_stats, media_analytics_details: { data: jobStats } }; logger.info( `Jobs directory analysis complete: ${analysis.total_jobs} jobs, ${analysis.total_documents} documents (${analysis.unique_documents} unique, ${analysis.duplicate_documents} duplicates), ${analysis.total_size_mb} MB` ); //Add an upload to the IO database to categorize all of this. const apiURL = process.env.IS_TEST ? "https://api.test.imex.online/analytics/documents" : "https://api.imex.online/analytics/documents"; const result = await axios.post(apiURL, { data: analysis }); logger.info(`Uploaded analysis results, response status: ${result.status}`); // Clean up temporary file await fsPromises.unlink(tempFilePath); logger.info("Cleaned up temporary analysis file"); return analysis; } catch (error) { logger.error("Failed to analyze Jobs directory:", (error as Error).message, (error as Error).stack); // Attempt to clean up temp file on error try { if (await fs.pathExists(tempFilePath)) { await fs.remove(tempFilePath); logger.info("Cleaned up temporary file after error"); } } catch (cleanupError) { logger.warning("Failed to clean up temporary file:", cleanupError); } throw error; } } /** * Analyze a single job folder (helper function) */ async function analyzeJobFolder(jobsPath: string, jobid: string, bodyshopid: UUID): Promise { const jobFolderPath = path.join(jobsPath, jobid); const { document_count, unique_document_count, duplicate_count, total_size_bytes, file_type_stats } = await getDirectoryStats(jobFolderPath); const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/; let validJobid: UUID | string | null = null; if (jobid === "temporary") { validJobid = null; } else if (uuidRegex.test(jobid)) { validJobid = jobid as UUID; } else { logger.warning(`Invalid jobid encountered in analyzeJobFolder: ${jobid}`); validJobid = null; } return { bodyshopid, jobid: validJobid, document_count, unique_document_count, duplicate_count, total_size_bytes, total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100, file_type_stats }; } /** * Calculate SHA256 hash of a file's content */ async function calculateFileHash(filePath: string): Promise { return new Promise((resolve, reject) => { const hash = crypto.createHash("sha256"); const stream = createReadStream(filePath); stream.on("data", (data: string | Buffer) => hash.update(data)); stream.on("end", () => resolve(hash.digest("hex"))); stream.on("error", (error: Error) => { console.error("Hashing error:", error); reject(error); }); }); } /** * Recursively get document count and total size for a directory (helper function) * Now with duplicate detection using content-based hashing */ async function getDirectoryStats(dirPath: string): Promise<{ document_count: number; unique_document_count: number; duplicate_count: number; total_size_bytes: number; file_type_stats: { [extension: string]: number }; contentHashes: Set; }> { let document_count = 0; let unique_document_count = 0; let duplicate_count = 0; let total_size_bytes = 0; const file_type_stats: { [extension: string]: number } = {}; const contentHashes = new Set(); try { const items = await readdir(dirPath); for (const item of items) { const itemPath = path.join(dirPath, item); const stat = await fsStat(itemPath); if (stat.isDirectory()) { // Skip thumbs and ConvertedOriginals folders (case-insensitive) const itemLower = item.toLowerCase(); if (itemLower === "thumbs" || itemLower === "convertedoriginal" || itemLower === "bills") { continue; } // Recursively analyze subdirectories const subStats = await getDirectoryStats(itemPath); document_count += subStats.document_count; unique_document_count += subStats.unique_document_count; duplicate_count += subStats.duplicate_count; total_size_bytes += subStats.total_size_bytes; // Merge file type stats for (const [ext, count] of Object.entries(subStats.file_type_stats)) { file_type_stats[ext] = (file_type_stats[ext] || 0) + count; } // Merge content hashes to detect duplicates across subdirectories for (const hash of subStats.contentHashes) { contentHashes.add(hash); } } else { // Count files as documents document_count++; total_size_bytes += stat.size; // Track file extension const ext = path.extname(item).toLowerCase() || "no-extension"; // Calculate content hash for image files to detect duplicates const isImageFile = [ ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".heic", ".heif", ".tiff", ".tif" ].includes(ext); if (isImageFile) { try { const fileHash = await calculateFileHash(itemPath); if (contentHashes.has(fileHash)) { // This is a duplicate - don't count in file_type_stats duplicate_count++; } else { // This is unique contentHashes.add(fileHash); unique_document_count++; // Only count unique files in file_type_stats file_type_stats[ext] = (file_type_stats[ext] || 0) + 1; } } catch (hashError) { logger.warning(`Failed to hash file ${itemPath}:`, hashError); // If hashing fails, count as unique to avoid losing data unique_document_count++; file_type_stats[ext] = (file_type_stats[ext] || 0) + 1; } } else { // Non-image files are counted as unique (not checking for duplicates) unique_document_count++; file_type_stats[ext] = (file_type_stats[ext] || 0) + 1; } } } } catch (error) { logger.error(`Error analyzing directory ${dirPath}:`, error); } return { document_count, unique_document_count, duplicate_count, total_size_bytes, file_type_stats, contentHashes }; } let bodyshopid: UUID; const getBodyshopIdFromConfig = async (): Promise => { if (bodyshopid) return bodyshopid; try { const fs = await import("fs/promises"); //Required as fs-extra fails. const configData = await fs.readFile(FolderPaths.Config, "utf-8"); const config = JSON.parse(configData); bodyshopid = config.bodyshopIds[0] as UUID; return bodyshopid; } catch (error) { logger.error("Failed to read bodyshopid from config.json:", error, (error as Error).stack); throw new Error("Could not read bodyshopid from config.json"); } };