489 lines
16 KiB
TypeScript
489 lines
16 KiB
TypeScript
import { exec } from "child_process";
|
|
import { promisify } from "util";
|
|
import * as fs from "fs-extra";
|
|
import { readdir, stat as fsStat } from "fs/promises";
|
|
import * as path from "path";
|
|
import { logger } from "../server.js";
|
|
import { FolderPaths } from "./serverInit.js";
|
|
import axios from "axios";
|
|
import { UUID } from "crypto";
|
|
import fsPromises from "fs/promises";
|
|
import crypto from "crypto";
|
|
import { createReadStream } from "fs";
|
|
|
|
const execAsync = promisify(exec);
|
|
|
|
interface S3SyncConfig {
|
|
bucketName: string;
|
|
region: string;
|
|
accessKeyId: string;
|
|
secretAccessKey: string;
|
|
keyPrefix?: string;
|
|
}
|
|
|
|
export interface JobFolderStats {
|
|
bodyshopid: UUID;
|
|
jobid: UUID | string | null;
|
|
//relativePath: string;
|
|
document_count: number;
|
|
unique_document_count: number;
|
|
duplicate_count: number;
|
|
total_size_bytes: number;
|
|
total_size_mb: number;
|
|
file_type_stats: { [extension: string]: number };
|
|
}
|
|
|
|
export interface JobsDirectoryAnalysis {
|
|
bodyshopid: UUID;
|
|
total_jobs: number;
|
|
total_documents: number;
|
|
unique_documents: number;
|
|
duplicate_documents: number;
|
|
total_size_bytes: number;
|
|
total_size_mb: number;
|
|
file_type_stats: { [extension: string]: number };
|
|
media_analytics_details: {
|
|
data: JobFolderStats[];
|
|
};
|
|
}
|
|
|
|
export class S3Sync {
|
|
private config: S3SyncConfig;
|
|
|
|
constructor(config: S3SyncConfig) {
|
|
this.config = config;
|
|
}
|
|
|
|
/**
|
|
* Sync the Jobs directory to S3 bucket using AWS CLI
|
|
*/
|
|
async syncJobsToS3(): Promise<void> {
|
|
try {
|
|
logger.info("Starting S3 sync for Jobs directory using AWS CLI...");
|
|
|
|
const jobsPath = FolderPaths.Jobs;
|
|
const keyPrefix = this.config.keyPrefix || "jobs/";
|
|
const s3Path = `s3://${this.config.bucketName}/${keyPrefix}`;
|
|
|
|
// Check if Jobs directory exists
|
|
if (!(await fs.pathExists(jobsPath))) {
|
|
logger.error(`Jobs directory does not exist: ${jobsPath}`);
|
|
return;
|
|
}
|
|
|
|
// Check if AWS CLI is available
|
|
await this.checkAwsCli();
|
|
|
|
// Set AWS credentials as environment variables for the command
|
|
const env = {
|
|
...process.env,
|
|
AWS_ACCESS_KEY_ID: this.config.accessKeyId,
|
|
AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey,
|
|
AWS_DEFAULT_REGION: this.config.region
|
|
};
|
|
|
|
// Run AWS S3 sync command
|
|
const syncCommand = `aws s3 sync "${jobsPath}" "${s3Path}"`;
|
|
|
|
logger.info(`Executing AWS S3 sync command`);
|
|
|
|
const { stdout, stderr } = await execAsync(syncCommand, {
|
|
env,
|
|
maxBuffer: 1024 * 1024 * 10 // 10MB buffer for large sync outputs
|
|
});
|
|
|
|
if (stdout) {
|
|
logger.info("S3 sync output:", stdout);
|
|
}
|
|
|
|
if (stderr) {
|
|
logger.warning("S3 sync warnings:", stderr);
|
|
}
|
|
|
|
logger.info("S3 sync completed successfully using AWS CLI");
|
|
} catch (error) {
|
|
logger.error("S3 sync failed:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if AWS CLI is available
|
|
*/
|
|
private async checkAwsCli(): Promise<void> {
|
|
try {
|
|
await execAsync("aws --version");
|
|
} catch (error) {
|
|
throw new Error(
|
|
"AWS CLI not found. Please install AWS CLI: https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html"
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Test S3 connection using AWS CLI
|
|
*/
|
|
async testConnection(): Promise<boolean> {
|
|
try {
|
|
// Check if AWS CLI is available first
|
|
await this.checkAwsCli();
|
|
|
|
// Set AWS credentials as environment variables
|
|
const env = {
|
|
...process.env,
|
|
AWS_ACCESS_KEY_ID: this.config.accessKeyId,
|
|
AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey,
|
|
AWS_DEFAULT_REGION: this.config.region
|
|
};
|
|
|
|
// Test connection by listing bucket
|
|
const testCommand = `aws s3 ls s3://${this.config.bucketName} --max-items 1`;
|
|
await execAsync(testCommand, { env });
|
|
|
|
logger.info(`S3 connection test successful for bucket: ${this.config.bucketName}`);
|
|
return true;
|
|
} catch (error) {
|
|
logger.error("S3 connection test failed:", error);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get sync statistics using AWS CLI
|
|
*/
|
|
async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> {
|
|
const available = false; //await this.testConnection();
|
|
return {
|
|
bucketName: this.config.bucketName,
|
|
region: this.config.region,
|
|
keyPrefix: this.config.keyPrefix || "jobs/",
|
|
available
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create S3Sync instance from environment variables
|
|
*/
|
|
export function createS3SyncFromEnv(): S3Sync | null {
|
|
const bucketName = process.env.S3_BUCKET_NAME || "test";
|
|
const region = process.env.S3_REGION || "ca-central-1";
|
|
const accessKeyId = process.env.S3_ACCESS_KEY_ID || "key";
|
|
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY || "secret";
|
|
const keyPrefix = process.env.S3_KEY_PREFIX || "prefix";
|
|
|
|
if (!bucketName || !accessKeyId || !secretAccessKey) {
|
|
logger.warning(
|
|
"S3 configuration incomplete. Required env vars: S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY"
|
|
);
|
|
return null;
|
|
}
|
|
|
|
return new S3Sync({
|
|
bucketName,
|
|
region,
|
|
accessKeyId,
|
|
secretAccessKey,
|
|
keyPrefix
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Standalone function to analyze Jobs directory without S3 configuration
|
|
* Uses streaming to a temporary file to avoid memory issues with large datasets
|
|
*/
|
|
export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
|
const tempFilePath = path.join(FolderPaths.Jobs, "..", "jobs_analysis_temp.jsonl");
|
|
|
|
try {
|
|
logger.info("Starting Jobs directory analysis...");
|
|
//Read from the config.json file in the root directory to get the bodyshopid
|
|
const bodyshopid: UUID = await getBodyshopIdFromConfig();
|
|
|
|
const jobsPath = FolderPaths.Jobs;
|
|
|
|
// Check if Jobs directory exists
|
|
if (!(await fs.pathExists(jobsPath))) {
|
|
logger.warning(`Jobs directory does not exist: ${jobsPath}`);
|
|
return {
|
|
bodyshopid,
|
|
total_jobs: 0,
|
|
total_documents: 0,
|
|
unique_documents: 0,
|
|
duplicate_documents: 0,
|
|
total_size_bytes: 0,
|
|
total_size_mb: 0,
|
|
file_type_stats: {},
|
|
media_analytics_details: { data: [] }
|
|
};
|
|
}
|
|
|
|
// Clear/create temporary file for streaming job stats
|
|
await fsPromises.writeFile(tempFilePath, "", "utf-8");
|
|
logger.info(`Created temporary file for streaming analysis: ${tempFilePath}`);
|
|
|
|
const jobFolders = await readdir(jobsPath);
|
|
let total_documents = 0;
|
|
let total_unique_documents = 0;
|
|
let total_duplicate_documents = 0;
|
|
let total_size_bytes = 0;
|
|
let total_jobs = 0;
|
|
const aggregated_file_type_stats: { [extension: string]: number } = {};
|
|
|
|
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
|
|
for (const jobFolder of jobFolders) {
|
|
if (jobFolder !== "temporary" && !uuidRegex.test(jobFolder)) {
|
|
logger.info(`Skipping invalid jobid directory: ${jobFolder}`);
|
|
continue;
|
|
}
|
|
const jobFolderPath = path.join(jobsPath, jobFolder);
|
|
const stat = await fsStat(jobFolderPath);
|
|
// Only process directories
|
|
if (stat.isDirectory()) {
|
|
const folderStats = await analyzeJobFolder(jobsPath, jobFolder, bodyshopid);
|
|
|
|
// Stream to file immediately (one JSON object per line)
|
|
await fsPromises.appendFile(tempFilePath, JSON.stringify(folderStats) + "\n", "utf-8");
|
|
|
|
total_jobs++;
|
|
total_documents += folderStats.document_count;
|
|
total_unique_documents += folderStats.unique_document_count;
|
|
total_duplicate_documents += folderStats.duplicate_count;
|
|
total_size_bytes += folderStats.total_size_bytes;
|
|
|
|
// Aggregate file type stats
|
|
for (const [ext, count] of Object.entries(folderStats.file_type_stats)) {
|
|
aggregated_file_type_stats[ext] = (aggregated_file_type_stats[ext] || 0) + count;
|
|
}
|
|
|
|
// Log progress every 100 jobs
|
|
if (total_jobs % 100 === 0) {
|
|
logger.info(`Progress: Analyzed ${total_jobs} jobs...`);
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.info(`Reading back job stats from temporary file...`);
|
|
|
|
// Read back the job stats from file
|
|
const fileContent = await fsPromises.readFile(tempFilePath, "utf-8");
|
|
const jobStats: JobFolderStats[] = fileContent
|
|
.trim()
|
|
.split("\n")
|
|
.filter((line) => line.length > 0)
|
|
.map((line) => JSON.parse(line));
|
|
|
|
// Sort by jobid
|
|
jobStats.sort((a, b) => a.jobid?.localeCompare(b.jobid!) || 0);
|
|
|
|
const analysis: JobsDirectoryAnalysis = {
|
|
bodyshopid, //read from the config.json file in the root directory
|
|
total_jobs,
|
|
total_documents,
|
|
unique_documents: total_unique_documents,
|
|
duplicate_documents: total_duplicate_documents,
|
|
total_size_bytes,
|
|
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
|
|
file_type_stats: aggregated_file_type_stats,
|
|
media_analytics_details: { data: jobStats }
|
|
};
|
|
|
|
logger.info(
|
|
`Jobs directory analysis complete: ${analysis.total_jobs} jobs, ${analysis.total_documents} documents (${analysis.unique_documents} unique, ${analysis.duplicate_documents} duplicates), ${analysis.total_size_mb} MB`
|
|
);
|
|
|
|
//Add an upload to the IO database to categorize all of this.
|
|
const apiURL = process.env.IS_TEST
|
|
? "https://api.test.imex.online/analytics/documents"
|
|
: "https://api.imex.online/analytics/documents";
|
|
const result = await axios.post(apiURL, { data: analysis });
|
|
|
|
logger.info(`Uploaded analysis results, response status: ${result.status}`);
|
|
// Clean up temporary file
|
|
await fsPromises.unlink(tempFilePath);
|
|
logger.info("Cleaned up temporary analysis file");
|
|
|
|
return analysis;
|
|
} catch (error) {
|
|
logger.error("Failed to analyze Jobs directory:", (error as Error).message, (error as Error).stack);
|
|
// Attempt to clean up temp file on error
|
|
try {
|
|
if (await fs.pathExists(tempFilePath)) {
|
|
await fs.remove(tempFilePath);
|
|
logger.info("Cleaned up temporary file after error");
|
|
}
|
|
} catch (cleanupError) {
|
|
logger.warning("Failed to clean up temporary file:", cleanupError);
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Analyze a single job folder (helper function)
|
|
*/
|
|
async function analyzeJobFolder(jobsPath: string, jobid: string, bodyshopid: UUID): Promise<JobFolderStats> {
|
|
const jobFolderPath = path.join(jobsPath, jobid);
|
|
|
|
const { document_count, unique_document_count, duplicate_count, total_size_bytes, file_type_stats } =
|
|
await getDirectoryStats(jobFolderPath);
|
|
|
|
const uuidRegex = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/;
|
|
let validJobid: UUID | string | null = null;
|
|
if (jobid === "temporary") {
|
|
validJobid = null;
|
|
} else if (uuidRegex.test(jobid)) {
|
|
validJobid = jobid as UUID;
|
|
} else {
|
|
logger.warning(`Invalid jobid encountered in analyzeJobFolder: ${jobid}`);
|
|
validJobid = null;
|
|
}
|
|
return {
|
|
bodyshopid,
|
|
jobid: validJobid,
|
|
document_count,
|
|
unique_document_count,
|
|
duplicate_count,
|
|
total_size_bytes,
|
|
total_size_mb: Math.round((total_size_bytes / (1024 * 1024)) * 100) / 100,
|
|
file_type_stats
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Calculate SHA256 hash of a file's content
|
|
*/
|
|
async function calculateFileHash(filePath: string): Promise<string> {
|
|
return new Promise((resolve, reject) => {
|
|
const hash = crypto.createHash("sha256");
|
|
const stream = createReadStream(filePath);
|
|
|
|
stream.on("data", (data: string | Buffer) => hash.update(data));
|
|
stream.on("end", () => resolve(hash.digest("hex")));
|
|
stream.on("error", (error: Error) => {
|
|
console.error("Hashing error:", error);
|
|
reject(error);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Recursively get document count and total size for a directory (helper function)
|
|
* Now with duplicate detection using content-based hashing
|
|
*/
|
|
async function getDirectoryStats(dirPath: string): Promise<{
|
|
document_count: number;
|
|
unique_document_count: number;
|
|
duplicate_count: number;
|
|
total_size_bytes: number;
|
|
file_type_stats: { [extension: string]: number };
|
|
contentHashes: Set<string>;
|
|
}> {
|
|
let document_count = 0;
|
|
let unique_document_count = 0;
|
|
let duplicate_count = 0;
|
|
let total_size_bytes = 0;
|
|
const file_type_stats: { [extension: string]: number } = {};
|
|
const contentHashes = new Set<string>();
|
|
|
|
try {
|
|
const items = await readdir(dirPath);
|
|
|
|
for (const item of items) {
|
|
const itemPath = path.join(dirPath, item);
|
|
const stat = await fsStat(itemPath);
|
|
|
|
if (stat.isDirectory()) {
|
|
// Skip thumbs and ConvertedOriginals folders (case-insensitive)
|
|
const itemLower = item.toLowerCase();
|
|
if (itemLower === "thumbs" || itemLower === "convertedoriginal" || itemLower === "bills") {
|
|
continue;
|
|
}
|
|
|
|
// Recursively analyze subdirectories
|
|
const subStats = await getDirectoryStats(itemPath);
|
|
document_count += subStats.document_count;
|
|
unique_document_count += subStats.unique_document_count;
|
|
duplicate_count += subStats.duplicate_count;
|
|
total_size_bytes += subStats.total_size_bytes;
|
|
|
|
// Merge file type stats
|
|
for (const [ext, count] of Object.entries(subStats.file_type_stats)) {
|
|
file_type_stats[ext] = (file_type_stats[ext] || 0) + count;
|
|
}
|
|
|
|
// Merge content hashes to detect duplicates across subdirectories
|
|
for (const hash of subStats.contentHashes) {
|
|
contentHashes.add(hash);
|
|
}
|
|
} else {
|
|
// Count files as documents
|
|
document_count++;
|
|
total_size_bytes += stat.size;
|
|
|
|
// Track file extension
|
|
const ext = path.extname(item).toLowerCase() || "no-extension";
|
|
|
|
// Calculate content hash for image files to detect duplicates
|
|
const isImageFile = [
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".webp",
|
|
".heic",
|
|
".heif",
|
|
".tiff",
|
|
".tif"
|
|
].includes(ext);
|
|
|
|
if (isImageFile) {
|
|
try {
|
|
const fileHash = await calculateFileHash(itemPath);
|
|
|
|
if (contentHashes.has(fileHash)) {
|
|
// This is a duplicate - don't count in file_type_stats
|
|
duplicate_count++;
|
|
} else {
|
|
// This is unique
|
|
contentHashes.add(fileHash);
|
|
unique_document_count++;
|
|
// Only count unique files in file_type_stats
|
|
file_type_stats[ext] = (file_type_stats[ext] || 0) + 1;
|
|
}
|
|
} catch (hashError) {
|
|
logger.warning(`Failed to hash file ${itemPath}:`, hashError);
|
|
// If hashing fails, count as unique to avoid losing data
|
|
unique_document_count++;
|
|
file_type_stats[ext] = (file_type_stats[ext] || 0) + 1;
|
|
}
|
|
} else {
|
|
// Non-image files are counted as unique (not checking for duplicates)
|
|
unique_document_count++;
|
|
file_type_stats[ext] = (file_type_stats[ext] || 0) + 1;
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error(`Error analyzing directory ${dirPath}:`, error);
|
|
}
|
|
|
|
return { document_count, unique_document_count, duplicate_count, total_size_bytes, file_type_stats, contentHashes };
|
|
}
|
|
|
|
let bodyshopid: UUID;
|
|
const getBodyshopIdFromConfig = async (): Promise<UUID> => {
|
|
if (bodyshopid) return bodyshopid;
|
|
try {
|
|
const fs = await import("fs/promises"); //Required as fs-extra fails.
|
|
const configData = await fs.readFile(FolderPaths.Config, "utf-8");
|
|
const config = JSON.parse(configData);
|
|
bodyshopid = config.bodyshopIds[0] as UUID;
|
|
return bodyshopid;
|
|
} catch (error) {
|
|
logger.error("Failed to read bodyshopid from config.json:", error, (error as Error).stack);
|
|
throw new Error("Could not read bodyshopid from config.json");
|
|
}
|
|
};
|