const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand } = require("@aws-sdk/client-textract"); const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3"); const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs"); const { v4: uuidv4 } = require('uuid'); // Initialize AWS clients const awsConfig = { region: process.env.AWS_AI_REGION || "ca-central-1", credentials: { accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY, } }; const textractClient = new TextractClient(awsConfig); const s3Client = new S3Client(awsConfig); const sqsClient = new SQSClient(awsConfig); let redisPubClient = null; const TEXTRACT_JOB_TTL = 3600; /** * Initialize the bill-ocr module with Redis client * @param {Object} pubClient - Redis cluster client */ function initializeBillOcr(pubClient) { redisPubClient = pubClient; } /** * Generate Redis key for Textract job using textract job ID * @param {string} textractJobId * @returns {string} */ function getTextractJobKey(textractJobId) { return `textract:job:${textractJobId}`; } /** * Store Textract job data in Redis * @param {string} textractJobId * @param {Object} jobData */ async function setTextractJob(textractJobId, jobData) { if (!redisPubClient) { throw new Error('Redis client not initialized. Call initializeBillOcr first.'); } const key = getTextractJobKey(textractJobId); await redisPubClient.set(key, JSON.stringify(jobData)); await redisPubClient.expire(key, TEXTRACT_JOB_TTL); } /** * Retrieve Textract job data from Redis * @param {string} textractJobId * @returns {Promise} */ async function getTextractJob(textractJobId) { if (!redisPubClient) { throw new Error('Redis client not initialized. Call initializeBillOcr first.'); } const key = getTextractJobKey(textractJobId); const data = await redisPubClient.get(key); return data ? JSON.parse(data) : null; } /** * Check if job exists by Textract job ID * @param {string} textractJobId * @returns {Promise} */ async function jobExists(textractJobId) { if (!redisPubClient) { throw new Error('Redis client not initialized. Call initializeBillOcr first.'); } console.log('Checking if job exists for Textract job ID:', textractJobId); const key = getTextractJobKey(textractJobId); const exists = await redisPubClient.exists(key); if (exists) { console.log(`Job found: ${textractJobId}`); return true; } console.log('No matching job found in Redis'); return false; } async function handleBillOcr(request, response) { // Check if file was uploaded if (!request.file) { response.status(400).send({ error: 'No file uploaded' }); return; } // The uploaded file is available in request.file const uploadedFile = request.file; try { // Start the Textract job (non-blocking) const jobInfo = await startTextractJob(uploadedFile.buffer); response.status(202).send({ success: true, jobId: jobInfo.jobId, message: 'Invoice processing started', statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}` }); } catch (error) { console.error('Error starting invoice processing:', error); response.status(500).send({ error: 'Failed to start invoice processing', message: error.message }); } } async function handleBillOcrStatus(request, response) { console.log('handleBillOcrStatus called'); console.log('request.params:', request.params); console.log('request.query:', request.query); const { jobId: textractJobId } = request.params; if (!textractJobId) { console.log('No textractJobId found in params'); response.status(400).send({ error: 'Job ID is required' }); return; } console.log('Looking for job:', textractJobId); const jobStatus = await getTextractJob(textractJobId); console.log('Job status:', jobStatus); if (!jobStatus) { response.status(404).send({ error: 'Job not found' }); return; } if (jobStatus.status === 'COMPLETED') { response.status(200).send({ status: 'COMPLETED', data: jobStatus.data }); } else if (jobStatus.status === 'FAILED') { response.status(500).send({ status: 'FAILED', error: jobStatus.error }); } else { response.status(200).send({ status: jobStatus.status }); } } async function startTextractJob(pdfBuffer) { // Upload PDF to S3 temporarily for Textract async processing const s3Bucket = process.env.AWS_AI_BUCKET; const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN; const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN; if (!s3Bucket) { throw new Error('AWS_AI_BUCKET environment variable is required'); } if (!snsTopicArn) { throw new Error('AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required'); } if (!snsRoleArn) { throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required'); } const uploadId = uuidv4(); const s3Key = `textract-temp/${uploadId}.pdf`; // Upload to S3 const uploadCommand = new PutObjectCommand({ Bucket: s3Bucket, Key: s3Key, Body: pdfBuffer, ContentType: 'application/pdf' }); await s3Client.send(uploadCommand); // Start async Textract expense analysis with SNS notification const startCommand = new StartExpenseAnalysisCommand({ DocumentLocation: { S3Object: { Bucket: s3Bucket, Name: s3Key } }, NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: snsRoleArn }, ClientRequestToken: uploadId }); const startResult = await textractClient.send(startCommand); const textractJobId = startResult.JobId; // Store job info in Redis using textractJobId as the key await setTextractJob(textractJobId, { status: 'IN_PROGRESS', s3Key: s3Key, uploadId: uploadId, startedAt: new Date().toISOString() }); return { jobId: textractJobId }; } // Process SQS messages from Textract completion notifications async function processSQSMessages() { const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL; if (!queueUrl) { console.error('AWS_TEXTRACT_SQS_QUEUE_URL not configured'); return; } try { console.log('Polling SQS queue:', queueUrl); const receiveCommand = new ReceiveMessageCommand({ QueueUrl: queueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, MessageAttributeNames: ['All'] }); const result = await sqsClient.send(receiveCommand); console.log('SQS poll result:', result.Messages ? `${result.Messages.length} messages` : 'no messages'); if (result.Messages && result.Messages.length > 0) { console.log('Processing', result.Messages.length, 'messages from SQS'); for (const message of result.Messages) { try { console.log("Processing message:", message); await handleTextractNotification(message); // Delete message after successful processing const deleteCommand = new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }); await sqsClient.send(deleteCommand); } catch (error) { console.error('Error processing message:', error); } } } } catch (error) { console.error('Error receiving SQS messages:', error); } } async function handleTextractNotification(message) { const body = JSON.parse(message.Body); let snsMessage try { snsMessage = JSON.parse(body.Message); } catch (error) { //Delete the message so it doesn't clog the queue const deleteCommand = new DeleteMessageCommand({ QueueUrl: process.env.AWS_TEXTRACT_SQS_QUEUE_URL, ReceiptHandle: message.ReceiptHandle }); await sqsClient.send(deleteCommand); console.error('Error parsing SNS message:', error); console.log('Message Deleted:', body); return; } const textractJobId = snsMessage.JobId; const status = snsMessage.Status; // Check if job exists in Redis const exists = await jobExists(textractJobId); if (!exists) { console.warn(`Job not found for Textract job ID: ${textractJobId}`); return; } const jobInfo = await getTextractJob(textractJobId); if (status === 'SUCCEEDED') { // Retrieve the results const invoiceData = await retrieveTextractResults(textractJobId); const processedData = processScanData(invoiceData); await setTextractJob(textractJobId, { ...jobInfo, status: 'COMPLETED', data: processedData, completedAt: new Date().toISOString() }); } else if (status === 'FAILED') { await setTextractJob(textractJobId, { ...jobInfo, status: 'FAILED', error: snsMessage.StatusMessage || 'Textract job failed', completedAt: new Date().toISOString() }); } } async function retrieveTextractResults(textractJobId) { // Handle pagination if there are multiple pages of results let allExpenseDocuments = []; let nextToken = null; do { const getCommand = new GetExpenseAnalysisCommand({ JobId: textractJobId, NextToken: nextToken }); const result = await textractClient.send(getCommand); if (result.ExpenseDocuments) { allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments); } nextToken = result.NextToken; } while (nextToken); // Extract invoice data from Textract response return extractInvoiceData({ ExpenseDocuments: allExpenseDocuments }); } // Start SQS polling (call this when server starts) function startSQSPolling() { const pollInterval = setInterval(() => { processSQSMessages().catch(error => { console.error('SQS polling error:', error); }); }, 10000); // Poll every 10 seconds return pollInterval; } function extractInvoiceData(textractResponse) { const invoiceData = { summary: {}, lineItems: [] }; if (!textractResponse.ExpenseDocuments || textractResponse.ExpenseDocuments.length === 0) { return invoiceData; } // Process each page of the invoice textractResponse.ExpenseDocuments.forEach(expenseDoc => { // Extract summary fields (vendor, invoice number, date, total, etc.) if (expenseDoc.SummaryFields) { expenseDoc.SummaryFields.forEach(field => { const fieldType = field.Type?.Text || ''; const fieldValue = field.ValueDetection?.Text || ''; const confidence = field.ValueDetection?.Confidence || 0; // Map common invoice fields if (fieldType && fieldValue) { invoiceData.summary[fieldType] = { value: fieldValue, confidence: confidence }; } }); } // Extract line items if (expenseDoc.LineItemGroups) { expenseDoc.LineItemGroups.forEach(lineItemGroup => { if (lineItemGroup.LineItems) { lineItemGroup.LineItems.forEach(lineItem => { const item = {}; if (lineItem.LineItemExpenseFields) { lineItem.LineItemExpenseFields.forEach(field => { const fieldType = field.Type?.Text || ''; const fieldValue = field.ValueDetection?.Text || ''; const confidence = field.ValueDetection?.Confidence || 0; if (fieldType && fieldValue) { // Normalize field names const normalizedField = normalizeFieldName(fieldType); item[normalizedField] = { value: fieldValue, confidence: confidence }; } }); } if (Object.keys(item).length > 0) { invoiceData.lineItems.push(item); } }); } }); } }); return invoiceData; } function normalizeFieldName(fieldType) { //Placeholder normalization for now. const fieldMap = { 'ITEM': 'description', 'QUANTITY': 'quantity', 'UNIT_PRICE': 'unitPrice', 'PRICE': 'price', 'PRODUCT_CODE': 'productCode', 'EXPENSE_ROW': 'row' }; return fieldMap[fieldType] || fieldType.toLowerCase().replace(/_/g, ''); } function processScanData(invoiceData) { // Process and clean the extracted data const processed = { summary: {}, lineItems: [] }; // Clean summary fields for (const [key, value] of Object.entries(invoiceData.summary)) { if (value.confidence > 50) { // Only include fields with > 50% confidence processed.summary[key] = value.value; } } // Process line items processed.lineItems = invoiceData.lineItems .map(item => { const processedItem = {}; for (const [key, value] of Object.entries(item)) { if (value.confidence > 50) { // Only include fields with > 50% confidence let cleanValue = value.value; // Parse numbers for quantity and price fields if (key === 'quantity') { cleanValue = parseFloat(cleanValue) || 0; } else if (key === 'unitPrice' || key === 'price') { // Remove currency symbols and parse cleanValue = parseFloat(cleanValue.replace(/[^0-9.-]/g, '')) || 0; } processedItem[key] = cleanValue; } } return processedItem; }) .filter(item => { // Filter out items with no description or with quantity <= 0 return item.description && (!item.quantity || item.quantity > 0); }); return processed; } module.exports = { initializeBillOcr, handleBillOcr, handleBillOcrStatus, startSQSPolling };