WIP s3 sync and directory stats.
This commit is contained in:
154
util/dailyS3Scheduler.ts
Normal file
154
util/dailyS3Scheduler.ts
Normal file
@@ -0,0 +1,154 @@
|
||||
import * as cron from "node-cron";
|
||||
import { logger } from "../server.js";
|
||||
import { S3Sync, createS3SyncFromEnv } from "./s3Sync.js";
|
||||
|
||||
export class DailyS3Scheduler {
|
||||
private s3Sync: S3Sync | null = null;
|
||||
private cronJob: cron.ScheduledTask | null = null;
|
||||
|
||||
constructor() {
|
||||
this.s3Sync = createS3SyncFromEnv();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the daily S3 sync scheduler
|
||||
* Runs at midnight PST (00:00 PST = 08:00 UTC during standard time, 07:00 UTC during daylight time)
|
||||
*/
|
||||
async start(): Promise<void> {
|
||||
if (!this.s3Sync) {
|
||||
logger.warn("S3 sync not configured. Skipping scheduler setup.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Test S3 connection before starting scheduler
|
||||
const connectionTest = await this.s3Sync.testConnection();
|
||||
if (!connectionTest) {
|
||||
logger.error("S3 connection test failed. S3 sync scheduler will not be started.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Cron expression for midnight PST
|
||||
// Note: This uses PST timezone. During PDT (daylight time), it will still run at midnight local time
|
||||
const cronExpression = "0 0 * * *"; // Every day at midnight
|
||||
const timezone = "America/Los_Angeles"; // PST/PDT timezone
|
||||
|
||||
this.cronJob = cron.schedule(
|
||||
cronExpression,
|
||||
async () => {
|
||||
await this.performDailySync();
|
||||
},
|
||||
{
|
||||
timezone: timezone,
|
||||
}
|
||||
);
|
||||
|
||||
logger.info(`Daily S3 sync scheduler started. Will run at midnight PST/PDT.`);
|
||||
logger.info(`Next sync scheduled for: ${this.getNextRunTime()}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the scheduler
|
||||
*/
|
||||
stop(): void {
|
||||
if (this.cronJob) {
|
||||
this.cronJob.stop();
|
||||
this.cronJob = null;
|
||||
logger.info("Daily S3 sync scheduler stopped.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the daily sync operation
|
||||
*/
|
||||
private async performDailySync(): Promise<void> {
|
||||
if (!this.s3Sync) {
|
||||
logger.error("S3 sync not available for daily sync");
|
||||
return;
|
||||
}
|
||||
|
||||
const startTime = new Date();
|
||||
logger.info(`Starting daily S3 sync at ${startTime.toISOString()}`);
|
||||
|
||||
try {
|
||||
await this.s3Sync.syncJobsToS3();
|
||||
const endTime = new Date();
|
||||
const duration = endTime.getTime() - startTime.getTime();
|
||||
logger.info(`Daily S3 sync completed successfully in ${duration}ms`);
|
||||
} catch (error) {
|
||||
logger.error("Daily S3 sync failed:", error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually trigger a sync (useful for testing)
|
||||
*/
|
||||
async triggerManualSync(): Promise<void> {
|
||||
if (!this.s3Sync) {
|
||||
logger.error("S3 sync not configured");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("Triggering manual S3 sync...");
|
||||
await this.performDailySync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next scheduled run time
|
||||
*/
|
||||
private getNextRunTime(): string {
|
||||
if (!this.cronJob) {
|
||||
return "Not scheduled";
|
||||
}
|
||||
|
||||
// Create a date object for midnight PST today
|
||||
const now = new Date();
|
||||
const pstNow = new Date(now.toLocaleString("en-US", { timeZone: "America/Los_Angeles" }));
|
||||
|
||||
// If it's past midnight today, next run is tomorrow at midnight
|
||||
const nextRun = new Date(pstNow);
|
||||
if (pstNow.getHours() > 0 || pstNow.getMinutes() > 0 || pstNow.getSeconds() > 0) {
|
||||
nextRun.setDate(nextRun.getDate() + 1);
|
||||
}
|
||||
nextRun.setHours(0, 0, 0, 0);
|
||||
|
||||
return nextRun.toLocaleString("en-US", {
|
||||
timeZone: "America/Los_Angeles",
|
||||
weekday: "long",
|
||||
year: "numeric",
|
||||
month: "long",
|
||||
day: "numeric",
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
timeZoneName: "short"
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get scheduler status
|
||||
*/
|
||||
async getStatus(): Promise<{
|
||||
isConfigured: boolean;
|
||||
isRunning: boolean;
|
||||
nextRun: string;
|
||||
syncStats?: { bucketName: string; region: string; keyPrefix: string; available: boolean };
|
||||
}> {
|
||||
let syncStats;
|
||||
if (this.s3Sync) {
|
||||
try {
|
||||
syncStats = await this.s3Sync.getSyncStats();
|
||||
} catch (error) {
|
||||
logger.error("Failed to get sync stats:", error);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
isConfigured: this.s3Sync !== null,
|
||||
isRunning: this.cronJob !== null,
|
||||
nextRun: this.getNextRunTime(),
|
||||
syncStats,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Export a singleton instance
|
||||
export const dailyS3Scheduler = new DailyS3Scheduler();
|
||||
388
util/s3Sync.ts
Normal file
388
util/s3Sync.ts
Normal file
@@ -0,0 +1,388 @@
|
||||
import { exec } from "child_process";
|
||||
import { promisify } from "util";
|
||||
import * as fs from "fs-extra";
|
||||
import * as path from "path";
|
||||
import { logger } from "../server.js";
|
||||
import { FolderPaths } from "./serverInit.js";
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
interface S3SyncConfig {
|
||||
bucketName: string;
|
||||
region: string;
|
||||
accessKeyId: string;
|
||||
secretAccessKey: string;
|
||||
keyPrefix?: string;
|
||||
}
|
||||
|
||||
export interface JobFolderStats {
|
||||
jobId: string;
|
||||
relativePath: string;
|
||||
documentCount: number;
|
||||
totalSizeBytes: number;
|
||||
totalSizeMB: number;
|
||||
}
|
||||
|
||||
export interface JobsDirectoryAnalysis {
|
||||
totalJobs: number;
|
||||
totalDocuments: number;
|
||||
totalSizeBytes: number;
|
||||
totalSizeMB: number;
|
||||
jobs: JobFolderStats[];
|
||||
}
|
||||
|
||||
export class S3Sync {
|
||||
private config: S3SyncConfig;
|
||||
|
||||
constructor(config: S3SyncConfig) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync the Jobs directory to S3 bucket using AWS CLI
|
||||
*/
|
||||
async syncJobsToS3(): Promise<void> {
|
||||
try {
|
||||
logger.info("Starting S3 sync for Jobs directory using AWS CLI...");
|
||||
|
||||
const jobsPath = FolderPaths.Jobs;
|
||||
const keyPrefix = this.config.keyPrefix || "jobs/";
|
||||
const s3Path = `s3://${this.config.bucketName}/${keyPrefix}`;
|
||||
|
||||
// Check if Jobs directory exists
|
||||
if (!(await fs.pathExists(jobsPath))) {
|
||||
logger.warn(`Jobs directory does not exist: ${jobsPath}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if AWS CLI is available
|
||||
await this.checkAwsCli();
|
||||
|
||||
// Set AWS credentials as environment variables for the command
|
||||
const env = {
|
||||
...process.env,
|
||||
AWS_ACCESS_KEY_ID: this.config.accessKeyId,
|
||||
AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey,
|
||||
AWS_DEFAULT_REGION: this.config.region
|
||||
};
|
||||
|
||||
// Run AWS S3 sync command
|
||||
const syncCommand = `aws s3 sync "${jobsPath}" "${s3Path}"`;
|
||||
|
||||
logger.info(`Executing AWS S3 sync command`);
|
||||
|
||||
const { stdout, stderr } = await execAsync(syncCommand, {
|
||||
env,
|
||||
maxBuffer: 1024 * 1024 * 10 // 10MB buffer for large sync outputs
|
||||
});
|
||||
|
||||
if (stdout) {
|
||||
logger.info("S3 sync output:", stdout);
|
||||
}
|
||||
|
||||
if (stderr) {
|
||||
logger.warn("S3 sync warnings:", stderr);
|
||||
}
|
||||
|
||||
logger.info("S3 sync completed successfully using AWS CLI");
|
||||
} catch (error) {
|
||||
logger.error("S3 sync failed:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if AWS CLI is available
|
||||
*/
|
||||
private async checkAwsCli(): Promise<void> {
|
||||
try {
|
||||
await execAsync("aws --version");
|
||||
} catch (error) {
|
||||
throw new Error(
|
||||
"AWS CLI not found. Please install AWS CLI: https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test S3 connection using AWS CLI
|
||||
*/
|
||||
async testConnection(): Promise<boolean> {
|
||||
try {
|
||||
// Check if AWS CLI is available first
|
||||
await this.checkAwsCli();
|
||||
|
||||
// Set AWS credentials as environment variables
|
||||
const env = {
|
||||
...process.env,
|
||||
AWS_ACCESS_KEY_ID: this.config.accessKeyId,
|
||||
AWS_SECRET_ACCESS_KEY: this.config.secretAccessKey,
|
||||
AWS_DEFAULT_REGION: this.config.region
|
||||
};
|
||||
|
||||
// Test connection by listing bucket
|
||||
const testCommand = `aws s3 ls s3://${this.config.bucketName} --max-items 1`;
|
||||
await execAsync(testCommand, { env });
|
||||
|
||||
logger.info(`S3 connection test successful for bucket: ${this.config.bucketName}`);
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error("S3 connection test failed:", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sync statistics using AWS CLI
|
||||
*/
|
||||
async getSyncStats(): Promise<{ bucketName: string; region: string; keyPrefix: string; available: boolean }> {
|
||||
const available = await this.testConnection();
|
||||
return {
|
||||
bucketName: this.config.bucketName,
|
||||
region: this.config.region,
|
||||
keyPrefix: this.config.keyPrefix || "jobs/",
|
||||
available
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze all job folders in the Jobs directory
|
||||
* Returns detailed statistics for each job folder
|
||||
*/
|
||||
async analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
||||
try {
|
||||
logger.info("Starting Jobs directory analysis...");
|
||||
|
||||
const jobsPath = FolderPaths.Jobs;
|
||||
|
||||
// Check if Jobs directory exists
|
||||
if (!(await fs.pathExists(jobsPath))) {
|
||||
logger.warn(`Jobs directory does not exist: ${jobsPath}`);
|
||||
return {
|
||||
totalJobs: 0,
|
||||
totalDocuments: 0,
|
||||
totalSizeBytes: 0,
|
||||
totalSizeMB: 0,
|
||||
jobs: []
|
||||
};
|
||||
}
|
||||
|
||||
const jobFolders = await fs.readdir(jobsPath);
|
||||
const jobStats: JobFolderStats[] = [];
|
||||
let totalDocuments = 0;
|
||||
let totalSizeBytes = 0;
|
||||
|
||||
for (const jobFolder of jobFolders) {
|
||||
const jobFolderPath = path.join(jobsPath, jobFolder);
|
||||
const stat = await fs.stat(jobFolderPath);
|
||||
|
||||
// Only process directories
|
||||
if (stat.isDirectory()) {
|
||||
const folderStats = await this.analyzeJobFolder(jobsPath, jobFolder);
|
||||
jobStats.push(folderStats);
|
||||
totalDocuments += folderStats.documentCount;
|
||||
totalSizeBytes += folderStats.totalSizeBytes;
|
||||
}
|
||||
}
|
||||
|
||||
const analysis: JobsDirectoryAnalysis = {
|
||||
totalJobs: jobStats.length,
|
||||
totalDocuments,
|
||||
totalSizeBytes,
|
||||
totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100,
|
||||
jobs: jobStats.sort((a, b) => a.jobId.localeCompare(b.jobId))
|
||||
};
|
||||
|
||||
logger.info(
|
||||
`Jobs directory analysis complete: ${analysis.totalJobs} jobs, ${analysis.totalDocuments} documents, ${analysis.totalSizeMB} MB`
|
||||
);
|
||||
return analysis;
|
||||
} catch (error) {
|
||||
logger.error("Failed to analyze Jobs directory:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze a single job folder
|
||||
*/
|
||||
private async analyzeJobFolder(jobsPath: string, jobId: string): Promise<JobFolderStats> {
|
||||
const jobFolderPath = path.join(jobsPath, jobId);
|
||||
const relativePath = path.relative(FolderPaths.Root, jobFolderPath);
|
||||
|
||||
const { documentCount, totalSizeBytes } = await this.getDirectoryStats(jobFolderPath);
|
||||
|
||||
return {
|
||||
jobId,
|
||||
relativePath,
|
||||
documentCount,
|
||||
totalSizeBytes,
|
||||
totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively get document count and total size for a directory
|
||||
*/
|
||||
private async getDirectoryStats(dirPath: string): Promise<{ documentCount: number; totalSizeBytes: number }> {
|
||||
let documentCount = 0;
|
||||
let totalSizeBytes = 0;
|
||||
|
||||
try {
|
||||
const items = await fs.readdir(dirPath);
|
||||
|
||||
for (const item of items) {
|
||||
const itemPath = path.join(dirPath, item);
|
||||
const stat = await fs.stat(itemPath);
|
||||
|
||||
if (stat.isDirectory()) {
|
||||
// Recursively analyze subdirectories
|
||||
const subStats = await this.getDirectoryStats(itemPath);
|
||||
documentCount += subStats.documentCount;
|
||||
totalSizeBytes += subStats.totalSizeBytes;
|
||||
} else {
|
||||
// Count files as documents
|
||||
documentCount++;
|
||||
totalSizeBytes += stat.size;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error analyzing directory ${dirPath}:`, error);
|
||||
}
|
||||
|
||||
return { documentCount, totalSizeBytes };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create S3Sync instance from environment variables
|
||||
*/
|
||||
export function createS3SyncFromEnv(): S3Sync | null {
|
||||
const bucketName = process.env.S3_BUCKET_NAME || "test";
|
||||
const region = process.env.S3_REGION || "ca-central-1";
|
||||
const accessKeyId = process.env.S3_ACCESS_KEY_ID || "key";
|
||||
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY || "secret";
|
||||
const keyPrefix = process.env.S3_KEY_PREFIX || "prefix";
|
||||
|
||||
if (!bucketName || !accessKeyId || !secretAccessKey) {
|
||||
logger.warn(
|
||||
"S3 configuration incomplete. Required env vars: S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY"
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
return new S3Sync({
|
||||
bucketName,
|
||||
region,
|
||||
accessKeyId,
|
||||
secretAccessKey,
|
||||
keyPrefix
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Standalone function to analyze Jobs directory without S3 configuration
|
||||
*/
|
||||
export async function analyzeJobsDirectory(): Promise<JobsDirectoryAnalysis> {
|
||||
try {
|
||||
logger.info("Starting Jobs directory analysis...");
|
||||
|
||||
const jobsPath = FolderPaths.Jobs;
|
||||
|
||||
// Check if Jobs directory exists
|
||||
if (!(await fs.pathExists(jobsPath))) {
|
||||
logger.warn(`Jobs directory does not exist: ${jobsPath}`);
|
||||
return {
|
||||
totalJobs: 0,
|
||||
totalDocuments: 0,
|
||||
totalSizeBytes: 0,
|
||||
totalSizeMB: 0,
|
||||
jobs: []
|
||||
};
|
||||
}
|
||||
|
||||
const jobFolders = await fs.readdir(jobsPath);
|
||||
const jobStats: JobFolderStats[] = [];
|
||||
let totalDocuments = 0;
|
||||
let totalSizeBytes = 0;
|
||||
|
||||
for (const jobFolder of jobFolders) {
|
||||
const jobFolderPath = path.join(jobsPath, jobFolder);
|
||||
const stat = await fs.stat(jobFolderPath);
|
||||
|
||||
// Only process directories
|
||||
if (stat.isDirectory()) {
|
||||
const folderStats = await analyzeJobFolder(jobsPath, jobFolder);
|
||||
jobStats.push(folderStats);
|
||||
totalDocuments += folderStats.documentCount;
|
||||
totalSizeBytes += folderStats.totalSizeBytes;
|
||||
}
|
||||
}
|
||||
|
||||
const analysis: JobsDirectoryAnalysis = {
|
||||
totalJobs: jobStats.length,
|
||||
totalDocuments,
|
||||
totalSizeBytes,
|
||||
totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100,
|
||||
jobs: jobStats.sort((a, b) => a.jobId.localeCompare(b.jobId))
|
||||
};
|
||||
|
||||
logger.info(
|
||||
`Jobs directory analysis complete: ${analysis.totalJobs} jobs, ${analysis.totalDocuments} documents, ${analysis.totalSizeMB} MB`
|
||||
);
|
||||
return analysis;
|
||||
} catch (error) {
|
||||
logger.error("Failed to analyze Jobs directory:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze a single job folder (standalone helper function)
|
||||
*/
|
||||
async function analyzeJobFolder(jobsPath: string, jobId: string): Promise<JobFolderStats> {
|
||||
const jobFolderPath = path.join(jobsPath, jobId);
|
||||
const relativePath = path.relative(FolderPaths.Root, jobFolderPath);
|
||||
|
||||
const { documentCount, totalSizeBytes } = await getDirectoryStats(jobFolderPath);
|
||||
|
||||
return {
|
||||
jobId,
|
||||
relativePath,
|
||||
documentCount,
|
||||
totalSizeBytes,
|
||||
totalSizeMB: Math.round((totalSizeBytes / (1024 * 1024)) * 100) / 100
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively get document count and total size for a directory (standalone helper function)
|
||||
*/
|
||||
async function getDirectoryStats(dirPath: string): Promise<{ documentCount: number; totalSizeBytes: number }> {
|
||||
let documentCount = 0;
|
||||
let totalSizeBytes = 0;
|
||||
|
||||
try {
|
||||
const items = await fs.readdir(dirPath);
|
||||
|
||||
for (const item of items) {
|
||||
const itemPath = path.join(dirPath, item);
|
||||
const stat = await fs.stat(itemPath);
|
||||
|
||||
if (stat.isDirectory()) {
|
||||
// Recursively analyze subdirectories
|
||||
const subStats = await getDirectoryStats(itemPath);
|
||||
documentCount += subStats.documentCount;
|
||||
totalSizeBytes += subStats.totalSizeBytes;
|
||||
} else {
|
||||
// Count files as documents
|
||||
documentCount++;
|
||||
totalSizeBytes += stat.size;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error analyzing directory ${dirPath}:`, error);
|
||||
}
|
||||
|
||||
return { documentCount, totalSizeBytes };
|
||||
}
|
||||
Reference in New Issue
Block a user