import { exec } from "child_process"; import { promisify } from "util"; import * as fs from "fs-extra"; import { readdir, stat as fsStat } from "fs/promises"; import * as path from "path"; import { logger } from "../server.js"; import { FolderPaths } from "./serverInit.js"; const execAsync = promisify(exec); interface S3SyncConfig { bucketName: string; region: string; accessKeyId: string; secretAccessKey: string; keyPrefix?: string; } export interface JobFolderStats { jobId: string; relativePath: string; documentCount: number; totalSizeBytes: number; totalSizeMB: number; fileTypeStats: { [extension: string]: number }; } export interface JobsDirectoryAnalysis { totalJobs: number; totalDocuments: number; totalSizeBytes: number; totalSizeMB: number; fileTypeStats: { [extension: string]: number }; jobs: JobFolderStats[]; } export class S3Sync { private config: S3SyncConfig; constructor(config: S3SyncConfig) { this.config = config; } /** * Sync the Jobs directory to S3 bucket using AWS CLI */ async syncJobsToS3(): Promise { try { logger.info("Starting S3 sync for Jobs directory using AWS CLI..."); const jobsPath = FolderPaths.Jobs; const keyPrefix = this.config.keyPrefix || "jobs/"; const s3Path = `s3://${this.config.bucketName}/${keyPrefix}`; // Check if Jobs directory exists if (!(await fs.pathExists(jobsPath))) { logger.warn(`Jobs directory does not exist: ${jobsPath}`); return; } // Check if AWS CLI is available await this.checkAwsCli(); // Set AWS credentials as environment variables for the command const env = { ...process.env, AWS_ACCESS_KEY_ID: this.config.accessKeyId, AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey, AWS_DEFAULT_REGION: this.config.region }; // Run AWS S3 sync command const syncCommand = `aws s3 sync "${jobsPath}" "${s3Path}"`; logger.info(`Executing AWS S3 sync command`); const { stdout, stderr } = await execAsync(syncCommand, { env, maxBuffer: 1024 * 1024 * 10 // 10MB buffer for large sync outputs }); if (stdout) { logger.info("S3 sync output:", stdout); } if (stderr) { logger.warn("S3 sync warnings:", stderr); } logger.info("S3 sync completed successfully using AWS CLI"); } catch (error) { logger.error("S3 sync failed:", error); throw error; } } /** * Check if AWS CLI is available */ private async checkAwsCli(): Promise { try { await execAsync("aws --version"); } catch (error) { throw new Error( "AWS CLI not found. Please install AWS CLI: https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html" ); } } /** * Test S3 connection using AWS CLI */ async testConnection(): Promise { try { // Check if AWS CLI is available first await this.checkAwsCli(); // Set AWS credentials as environment variables const env = { ...process.env, AWS_ACCESS_KEY_ID: this.config.accessKeyId, AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey, AWS_DEFAULT_REGION: this.config.region }; // Test connection by listing bucket const testCommand = `aws s3 ls s3://${this.config.bucketName} --max-items 1`; await execAsync(testCommand, { env }); logger.info(`S3 connection test successful for bucket: ${this.config.bucketName}`); return true; } catch (error) { logger.error("S3 connection test failed:", error); return false; } } /** * Get sync statistics using AWS CLI */ async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> { const available = await this.testConnection(); return { bucketName: this.config.bucketName, region: this.config.region, keyPrefix: this.config.keyPrefix || "jobs/", available }; } /** * Analyze all job folders in the Jobs directory * Returns detailed statistics for each job folder */ async analyzeJobsDirectory(): Promise { try { logger.info("Starting Jobs directory analysis..."); const jobsPath = FolderPaths.Jobs; // Check if Jobs directory exists if (!(await fs.pathExists(jobsPath))) { logger.warn(`Jobs directory does not exist: ${jobsPath}`); return { totalJobs: 0, totalDocuments: 0, totalSizeBytes: 0, totalSizeMB: 0, fileTypeStats: {}, jobs: [] }; } const jobFolders = await readdir(jobsPath); const jobStats: JobFolderStats[] = []; let totalDocuments = 0; let totalSizeBytes = 0; const aggregatedFileTypeStats: { [extension: string]: number } = {}; for (const jobFolder of jobFolders) { const jobFolderPath = path.join(jobsPath, jobFolder); const stat = await fsStat(jobFolderPath); // Only process directories if (stat.isDirectory()) { const folderStats = await this.analyzeJobFolder(jobsPath, jobFolder); jobStats.push(folderStats); totalDocuments += folderStats.documentCount; totalSizeBytes += folderStats.totalSizeBytes; // Aggregate file type stats for (const [ext, count] of Object.entries(folderStats.fileTypeStats)) { aggregatedFileTypeStats[ext] = (aggregatedFileTypeStats[ext] || 0) + count; } } } const analysis: JobsDirectoryAnalysis = { totalJobs: jobStats.length, totalDocuments, totalSizeBytes, totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100, fileTypeStats: aggregatedFileTypeStats, jobs: jobStats.sort((a, b) => a.jobId.localeCompare(b.jobId)) }; logger.info( `Jobs directory analysis complete: ${analysis.totalJobs} jobs, ${analysis.totalDocuments} documents, ${analysis.totalSizeMB} MB` ); return analysis; } catch (error) { logger.error("Failed to analyze Jobs directory:", error); throw error; } } /** * Analyze a single job folder */ private async analyzeJobFolder(jobsPath: string, jobId: string): Promise { const jobFolderPath = path.join(jobsPath, jobId); const relativePath = path.relative(FolderPaths.Root, jobFolderPath); const { documentCount, totalSizeBytes, fileTypeStats } = await this.getDirectoryStats(jobFolderPath); return { jobId, relativePath, documentCount, totalSizeBytes, totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100, fileTypeStats }; } /** * Recursively get document count and total size for a directory */ private async getDirectoryStats( dirPath: string ): Promise<{ documentCount: number; totalSizeBytes: number; fileTypeStats: { [extension: string]: number } }> { let documentCount = 0; let totalSizeBytes = 0; const fileTypeStats: { [extension: string]: number } = {}; try { const items = await readdir(dirPath); for (const item of items) { const itemPath = path.join(dirPath, item); const stat = await fsStat(itemPath); if (stat.isDirectory()) { // Recursively analyze subdirectories const subStats = await this.getDirectoryStats(itemPath); documentCount += subStats.documentCount; totalSizeBytes += subStats.totalSizeBytes; // Merge file type stats for (const [ext, count] of Object.entries(subStats.fileTypeStats)) { fileTypeStats[ext] = (fileTypeStats[ext] || 0) + count; } } else { // Count files as documents documentCount++; totalSizeBytes += stat.size; // Track file extension const ext = path.extname(item).toLowerCase() || "no-extension"; fileTypeStats[ext] = (fileTypeStats[ext] || 0) + 1; } } } catch (error) { logger.error(`Error analyzing directory ${dirPath}:`, error); } return { documentCount, totalSizeBytes, fileTypeStats }; } } /** * Create S3Sync instance from environment variables */ export function createS3SyncFromEnv(): S3Sync | null { const bucketName = process.env.S3_BUCKET_NAME || "test"; const region = process.env.S3_REGION || "ca-central-1"; const accessKeyId = process.env.S3_ACCESS_KEY_ID || "key"; const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY || "secret"; const keyPrefix = process.env.S3_KEY_PREFIX || "prefix"; if (!bucketName || !accessKeyId || !secretAccessKey) { logger.warn( "S3 configuration incomplete. Required env vars: S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY" ); return null; } return new S3Sync({ bucketName, region, accessKeyId, secretAccessKey, keyPrefix }); } /** * Standalone function to analyze Jobs directory without S3 configuration */ export async function analyzeJobsDirectory(): Promise { try { logger.info("Starting Jobs directory analysis..."); const jobsPath = FolderPaths.Jobs; // Check if Jobs directory exists if (!(await fs.pathExists(jobsPath))) { logger.warn(`Jobs directory does not exist: ${jobsPath}`); return { totalJobs: 0, totalDocuments: 0, totalSizeBytes: 0, totalSizeMB: 0, fileTypeStats: {}, jobs: [] }; } const jobFolders = await readdir(jobsPath); const jobStats: JobFolderStats[] = []; let totalDocuments = 0; let totalSizeBytes = 0; const aggregatedFileTypeStats: { [extension: string]: number } = {}; for (const jobFolder of jobFolders) { const jobFolderPath = path.join(jobsPath, jobFolder); const stat = await fsStat(jobFolderPath); // Only process directories if (stat.isDirectory()) { const folderStats = await analyzeJobFolder(jobsPath, jobFolder); jobStats.push(folderStats); totalDocuments += folderStats.documentCount; totalSizeBytes += folderStats.totalSizeBytes; // Aggregate file type stats for (const [ext, count] of Object.entries(folderStats.fileTypeStats)) { aggregatedFileTypeStats[ext] = (aggregatedFileTypeStats[ext] || 0) + count; } } } const analysis: JobsDirectoryAnalysis = { totalJobs: jobStats.length, totalDocuments, totalSizeBytes, totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100, fileTypeStats: aggregatedFileTypeStats, jobs: jobStats.sort((a, b) => a.jobId.localeCompare(b.jobId)) }; logger.info( `Jobs directory analysis complete: ${analysis.totalJobs} jobs, ${analysis.totalDocuments} documents, ${analysis.totalSizeMB} MB` ); //Add an upload to the IO database to categorize all of this. return analysis; } catch (error) { logger.error("Failed to analyze Jobs directory:", error); throw error; } } /** * Analyze a single job folder (standalone helper function) */ async function analyzeJobFolder(jobsPath: string, jobId: string): Promise { const jobFolderPath = path.join(jobsPath, jobId); const relativePath = path.relative(FolderPaths.Root, jobFolderPath); const { documentCount, totalSizeBytes, fileTypeStats } = await getDirectoryStats(jobFolderPath); return { jobId, relativePath, documentCount, totalSizeBytes, totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100, fileTypeStats }; } /** * Recursively get document count and total size for a directory (standalone helper function) */ async function getDirectoryStats( dirPath: string ): Promise<{ documentCount: number; totalSizeBytes: number; fileTypeStats: { [extension: string]: number } }> { let documentCount = 0; let totalSizeBytes = 0; const fileTypeStats: { [extension: string]: number } = {}; try { const items = await readdir(dirPath); for (const item of items) { const itemPath = path.join(dirPath, item); const stat = await fsStat(itemPath); if (stat.isDirectory()) { // Recursively analyze subdirectories const subStats = await getDirectoryStats(itemPath); documentCount += subStats.documentCount; totalSizeBytes += subStats.totalSizeBytes; // Merge file type stats for (const [ext, count] of Object.entries(subStats.fileTypeStats)) { fileTypeStats[ext] = (fileTypeStats[ext] || 0) + count; } } else { // Count files as documents documentCount++; totalSizeBytes += stat.size; // Track file extension const ext = path.extname(item).toLowerCase() || "no-extension"; fileTypeStats[ext] = (fileTypeStats[ext] || 0) + 1; } } } catch (error) { logger.error(`Error analyzing directory ${dirPath}:`, error); } return { documentCount, totalSizeBytes, fileTypeStats }; }