const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand } = require("@aws-sdk/client-textract"); const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3"); const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs"); const { v4: uuidv4 } = require('uuid'); // Initialize AWS clients const awsConfig = { region: process.env.AWS_AI_REGION || "ca-central-1", credentials: { accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY, } }; const textractClient = new TextractClient(awsConfig); const s3Client = new S3Client(awsConfig); const sqsClient = new SQSClient(awsConfig); // In-memory job storage (consider using Redis or a database for production) const jobStore = new Map(); async function handleBillOcr(request, response) { // Check if file was uploaded if (!request.file) { response.status(400).send({ error: 'No file uploaded' }); return; } // The uploaded file is available in request.file const uploadedFile = request.file; try { // Start the Textract job (non-blocking) const jobInfo = await startTextractJob(uploadedFile.buffer); response.status(202).send({ success: true, jobId: jobInfo.jobId, message: 'Invoice processing started', statusUrl: `/api/bill-ocr/status/${jobInfo.jobId}` }); } catch (error) { console.error('Error starting invoice processing:', error); response.status(500).send({ error: 'Failed to start invoice processing', message: error.message }); } } async function handleBillOcrStatus(request, response) { console.log('handleBillOcrStatus called'); console.log('request.params:', request.params); console.log('request.query:', request.query); const { jobId } = request.params; if (!jobId) { console.log('No jobId found in params'); response.status(400).send({ error: 'Job ID is required' }); return; } console.log('Looking for job:', jobId); const jobStatus = jobStore.get(jobId); console.log('Job status:', jobStatus); if (!jobStatus) { response.status(404).send({ error: 'Job not found' }); return; } if (jobStatus.status === 'COMPLETED') { response.status(200).send({ status: 'COMPLETED', data: jobStatus.data }); } else if (jobStatus.status === 'FAILED') { response.status(500).send({ status: 'FAILED', error: jobStatus.error }); } else { response.status(200).send({ status: jobStatus.status }); } } async function startTextractJob(pdfBuffer) { // Upload PDF to S3 temporarily for Textract async processing const s3Bucket = process.env.AWS_AI_BUCKET; const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN; const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN; if (!s3Bucket) { throw new Error('AWS_AI_BUCKET environment variable is required'); } if (!snsTopicArn) { throw new Error('AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required'); } if (!snsRoleArn) { throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required'); } const jobId = uuidv4(); const s3Key = `textract-temp/${jobId}.pdf`; // Upload to S3 const uploadCommand = new PutObjectCommand({ Bucket: s3Bucket, Key: s3Key, Body: pdfBuffer, ContentType: 'application/pdf' }); await s3Client.send(uploadCommand); // Start async Textract expense analysis with SNS notification const startCommand = new StartExpenseAnalysisCommand({ DocumentLocation: { S3Object: { Bucket: s3Bucket, Name: s3Key } }, OutputConfig: { S3Bucket: s3Bucket, S3Prefix: `textract-output/${jobId}/` }, NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: snsRoleArn }, ClientRequestToken: jobId }); const startResult = await textractClient.send(startCommand); // Store job info jobStore.set(jobId, { status: 'IN_PROGRESS', textractJobId: startResult.JobId, s3Key: s3Key, startedAt: new Date().toISOString() }); return { jobId: jobId, textractJobId: startResult.JobId }; } // Process SQS messages from Textract completion notifications async function processSQSMessages() { const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL; if (!queueUrl) { console.error('AWS_TEXTRACT_SQS_QUEUE_URL not configured'); return; } try { console.log('Polling SQS queue:', queueUrl); const receiveCommand = new ReceiveMessageCommand({ QueueUrl: queueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, MessageAttributeNames: ['All'] }); const result = await sqsClient.send(receiveCommand); console.log('SQS poll result:', result.Messages ? `${result.Messages.length} messages` : 'no messages'); if (result.Messages && result.Messages.length > 0) { console.log('Processing', result.Messages.length, 'messages from SQS'); for (const message of result.Messages) { try { console.log("Processing message:", message); await handleTextractNotification(message); // Delete message after successful processing const deleteCommand = new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }); await sqsClient.send(deleteCommand); } catch (error) { console.error('Error processing message:', error); } } } } catch (error) { console.error('Error receiving SQS messages:', error); } } async function handleTextractNotification(message) { const body = JSON.parse(message.Body); const snsMessage = JSON.parse(body.Message); const textractJobId = snsMessage.JobId; const status = snsMessage.Status; // Find our job by Textract job ID let ourJobId = null; for (const [key, value] of jobStore.entries()) { if (value.textractJobId === textractJobId) { ourJobId = key; break; } } if (!ourJobId) { console.warn(`Job not found for Textract job ID: ${textractJobId}`); return; } const jobInfo = jobStore.get(ourJobId); if (status === 'SUCCEEDED') { // Retrieve the results const invoiceData = await retrieveTextractResults(textractJobId); const processedData = processScanData(invoiceData); jobStore.set(ourJobId, { ...jobInfo, status: 'COMPLETED', data: processedData, completedAt: new Date().toISOString() }); } else if (status === 'FAILED') { jobStore.set(ourJobId, { ...jobInfo, status: 'FAILED', error: snsMessage.StatusMessage || 'Textract job failed', completedAt: new Date().toISOString() }); } } async function retrieveTextractResults(textractJobId) { // Handle pagination if there are multiple pages of results let allExpenseDocuments = []; let nextToken = null; do { const getCommand = new GetExpenseAnalysisCommand({ JobId: textractJobId, NextToken: nextToken }); const result = await textractClient.send(getCommand); if (result.ExpenseDocuments) { allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments); } nextToken = result.NextToken; } while (nextToken); // Extract invoice data from Textract response return extractInvoiceData({ ExpenseDocuments: allExpenseDocuments }); } // Start SQS polling (call this when server starts) function startSQSPolling() { const pollInterval = setInterval(() => { processSQSMessages().catch(error => { console.error('SQS polling error:', error); }); }, 5000); // Poll every 5 seconds return pollInterval; } function extractInvoiceData(textractResponse) { const invoiceData = { summary: {}, lineItems: [] }; if (!textractResponse.ExpenseDocuments || textractResponse.ExpenseDocuments.length === 0) { return invoiceData; } // Process each page of the invoice textractResponse.ExpenseDocuments.forEach(expenseDoc => { // Extract summary fields (vendor, invoice number, date, total, etc.) if (expenseDoc.SummaryFields) { expenseDoc.SummaryFields.forEach(field => { const fieldType = field.Type?.Text || ''; const fieldValue = field.ValueDetection?.Text || ''; const confidence = field.ValueDetection?.Confidence || 0; // Map common invoice fields if (fieldType && fieldValue) { invoiceData.summary[fieldType] = { value: fieldValue, confidence: confidence }; } }); } // Extract line items if (expenseDoc.LineItemGroups) { expenseDoc.LineItemGroups.forEach(lineItemGroup => { if (lineItemGroup.LineItems) { lineItemGroup.LineItems.forEach(lineItem => { const item = {}; if (lineItem.LineItemExpenseFields) { lineItem.LineItemExpenseFields.forEach(field => { const fieldType = field.Type?.Text || ''; const fieldValue = field.ValueDetection?.Text || ''; const confidence = field.ValueDetection?.Confidence || 0; if (fieldType && fieldValue) { // Normalize field names const normalizedField = normalizeFieldName(fieldType); item[normalizedField] = { value: fieldValue, confidence: confidence }; } }); } if (Object.keys(item).length > 0) { invoiceData.lineItems.push(item); } }); } }); } }); return invoiceData; } function normalizeFieldName(fieldType) { // Convert Textract field types to more readable names const fieldMap = { 'ITEM': 'description', 'QUANTITY': 'quantity', 'UNIT_PRICE': 'unitPrice', 'PRICE': 'price', 'PRODUCT_CODE': 'productCode', 'EXPENSE_ROW': 'row' }; return fieldMap[fieldType] || fieldType.toLowerCase().replace(/_/g, ''); } function processScanData(invoiceData) { // Process and clean the extracted data const processed = { summary: {}, lineItems: [] }; // Clean summary fields for (const [key, value] of Object.entries(invoiceData.summary)) { if (value.confidence > 50) { // Only include fields with > 50% confidence processed.summary[key] = value.value; } } // Process line items processed.lineItems = invoiceData.lineItems .map(item => { const processedItem = {}; for (const [key, value] of Object.entries(item)) { if (value.confidence > 50) { // Only include fields with > 50% confidence let cleanValue = value.value; // Parse numbers for quantity and price fields if (key === 'quantity') { cleanValue = parseFloat(cleanValue) || 0; } else if (key === 'unitPrice' || key === 'price') { // Remove currency symbols and parse cleanValue = parseFloat(cleanValue.replace(/[^0-9.-]/g, '')) || 0; } processedItem[key] = cleanValue; } } return processedItem; }) .filter(item => { // Filter out items with no description or with quantity <= 0 return item.description && (!item.quantity || item.quantity > 0); }); return processed; } module.exports = { handleBillOcr, handleBillOcrStatus, startSQSPolling };