const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand, AnalyzeExpenseCommand } = require("@aws-sdk/client-textract"); const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3"); const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs"); const { v4: uuidv4 } = require('uuid'); const { getTextractJobKey, setTextractJob, getTextractJob, getFileType, getPdfPageCount, hasActiveJobs } = require("./bill-ocr-helpers"); const { extractInvoiceData, processScanData } = require("./bill-ocr-normalize"); const { generateBillFormData } = require("./bill-ocr-generator"); // Initialize AWS clients const awsConfig = { region: process.env.AWS_AI_REGION || "ca-central-1", credentials: { accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY, } }; const textractClient = new TextractClient(awsConfig); const s3Client = new S3Client(awsConfig); const sqsClient = new SQSClient(awsConfig); let redisPubClient = null; /** * Initialize the bill-ocr module with Redis client * @param {Object} pubClient - Redis cluster client */ function initializeBillOcr(pubClient) { redisPubClient = pubClient; } /** * Check if job exists by Textract job ID * @param {string} textractJobId * @returns {Promise} */ async function jobExists(textractJobId) { if (!redisPubClient) { throw new Error('Redis client not initialized. Call initializeBillOcr first.'); } console.log('Checking if job exists for Textract job ID:', textractJobId); const key = getTextractJobKey(textractJobId); const exists = await redisPubClient.exists(key); if (exists) { console.log(`Job found: ${textractJobId}`); return true; } console.log('No matching job found in Redis'); return false; } async function handleBillOcr(request, response) { // Check if file was uploaded if (!request.file) { response.status(400).send({ error: 'No file uploaded.' }); return; } // The uploaded file is available in request.file const uploadedFile = request.file; const { jobid, bodyshopid, parts_orderid } = request.body; try { const fileType = getFileType(uploadedFile); console.log(`Processing file type: ${fileType}`); // Images are always processed synchronously (single page) if (fileType === 'image') { console.log('Image => 1 page, processing synchronously'); const result = await processSinglePageDocument(uploadedFile.buffer); response.status(200).send({ success: true, status: 'COMPLETED', data: result, message: 'Invoice processing completed' }); } else if (fileType === 'pdf') { // Check the number of pages in the PDF const pageCount = await getPdfPageCount(uploadedFile.buffer); console.log(`PDF has ${pageCount} page(s)`); if (pageCount === 1) { // Process synchronously for single-page documents console.log('PDF => 1 page, processing synchronously'); const result = await processSinglePageDocument(uploadedFile.buffer); //const billResult = await generateBillFormData({ result, }); response.status(200).send({ success: true, status: 'COMPLETED', data: { result, }, message: 'Invoice processing completed' }); } else { // Start the Textract job (non-blocking) for multi-page documents console.log('PDF => 2+ pages, processing asynchronously'); const jobInfo = await startTextractJob(uploadedFile.buffer); response.status(202).send({ success: true, jobId: jobInfo.jobId, message: 'Invoice processing started', statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}` }); } } else { response.status(400).send({ error: 'Unsupported file type', message: 'Please upload a PDF or supported image file (JPEG, PNG, TIFF)' }); } } catch (error) { console.error('Error starting invoice processing:', error); response.status(500).send({ error: 'Failed to start invoice processing', message: error.message }); } } async function handleBillOcrStatus(request, response) { const { jobId: textractJobId } = request.params; if (!textractJobId) { console.log('No textractJobId found in params'); response.status(400).send({ error: 'Job ID is required' }); return; } console.log('Looking for job:', textractJobId); const jobStatus = await getTextractJob({ redisPubClient, textractJobId }); console.log('Job status:', jobStatus); if (!jobStatus) { response.status(404).send({ error: 'Job not found' }); return; } if (jobStatus.status === 'COMPLETED') { response.status(200).send({ status: 'COMPLETED', data: jobStatus.data }); } else if (jobStatus.status === 'FAILED') { response.status(500).send({ status: 'FAILED', error: jobStatus.error }); } else { response.status(200).send({ status: jobStatus.status }); } } /** * Process a single-page document synchronously using AnalyzeExpenseCommand * @param {Buffer} pdfBuffer * @returns {Promise} */ async function processSinglePageDocument(pdfBuffer) { const analyzeCommand = new AnalyzeExpenseCommand({ Document: { Bytes: pdfBuffer } }); const result = await textractClient.send(analyzeCommand); const invoiceData = extractInvoiceData(result); const processedData = processScanData(invoiceData); return { ...processedData, originalTextractResponse: result }; } async function startTextractJob(pdfBuffer) { // Upload PDF to S3 temporarily for Textract async processing const s3Bucket = process.env.AWS_AI_BUCKET; const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN; const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN; if (!s3Bucket) { throw new Error('AWS_AI_BUCKET environment variable is required'); } if (!snsTopicArn) { throw new Error('AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required'); } if (!snsRoleArn) { throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required'); } const uploadId = uuidv4(); const s3Key = `textract-temp/${uploadId}.pdf`; //TODO Update Keys structure to something better. // Upload to S3 const uploadCommand = new PutObjectCommand({ Bucket: s3Bucket, Key: s3Key, Body: pdfBuffer, ContentType: 'application/pdf' //Hard coded - we only support PDFs for multi-page }); await s3Client.send(uploadCommand); // Start async Textract expense analysis with SNS notification const startCommand = new StartExpenseAnalysisCommand({ DocumentLocation: { S3Object: { Bucket: s3Bucket, Name: s3Key } }, NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: snsRoleArn }, ClientRequestToken: uploadId }); const startResult = await textractClient.send(startCommand); const textractJobId = startResult.JobId; // Store job info in Redis using textractJobId as the key await setTextractJob( { redisPubClient, textractJobId, jobData: { status: 'IN_PROGRESS', s3Key: s3Key, uploadId: uploadId, startedAt: new Date().toISOString() } } ); return { jobId: textractJobId }; } // Process SQS messages from Textract completion notifications async function processSQSMessages() { const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL; if (!queueUrl) { console.error('AWS_TEXTRACT_SQS_QUEUE_URL not configured'); return; } // Only poll if there are active mutli page jobs in progress const hasActive = await hasActiveJobs({ redisPubClient }); if (!hasActive) { console.log('No active jobs in progress, skipping SQS poll'); return; } try { console.log('Polling SQS queue:', queueUrl); const receiveCommand = new ReceiveMessageCommand({ QueueUrl: queueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, MessageAttributeNames: ['All'] }); const result = await sqsClient.send(receiveCommand); console.log('SQS poll result:', result.Messages ? `${result.Messages.length} messages` : 'no messages'); if (result.Messages && result.Messages.length > 0) { console.log('Processing', result.Messages.length, 'messages from SQS'); for (const message of result.Messages) { try { console.log("Processing message:", message); await handleTextractNotification(message); // Delete message after successful processing const deleteCommand = new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }); await sqsClient.send(deleteCommand); } catch (error) { console.error('Error processing message:', error); } } } } catch (error) { console.error('Error receiving SQS messages:', error); } } async function handleTextractNotification(message) { const body = JSON.parse(message.Body); let snsMessage try { snsMessage = JSON.parse(body.Message); } catch (error) { //Delete the message so it doesn't clog the queue const deleteCommand = new DeleteMessageCommand({ QueueUrl: process.env.AWS_TEXTRACT_SQS_QUEUE_URL, ReceiptHandle: message.ReceiptHandle }); await sqsClient.send(deleteCommand); console.error('Error parsing SNS message:', error); console.log('Message Deleted:', body); return; } const textractJobId = snsMessage.JobId; const status = snsMessage.Status; // Check if job exists in Redis const exists = await jobExists(textractJobId); if (!exists) { console.warn(`Job not found for Textract job ID: ${textractJobId}`); return; } const jobInfo = await getTextractJob({ redisPubClient, textractJobId }); if (status === 'SUCCEEDED') { // Retrieve the results const { processedData, originalResponse } = await retrieveTextractResults(textractJobId); await setTextractJob( { redisPubClient, textractJobId, jobData: { ...jobInfo, status: 'COMPLETED', data: { ...processedData, originalTextractResponse: originalResponse }, completedAt: new Date().toISOString() } } ); } else if (status === 'FAILED') { await setTextractJob( { redisPubClient, textractJobId, jobData: { ...jobInfo, status: 'FAILED', error: snsMessage.StatusMessage || 'Textract job failed', completedAt: new Date().toISOString() } } ); } } async function retrieveTextractResults(textractJobId) { // Handle pagination if there are multiple pages of results let allExpenseDocuments = []; let nextToken = null; do { const getCommand = new GetExpenseAnalysisCommand({ JobId: textractJobId, NextToken: nextToken }); const result = await textractClient.send(getCommand); if (result.ExpenseDocuments) { allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments); } nextToken = result.NextToken; } while (nextToken); // Store the complete original response const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments }; // Extract invoice data from Textract response const invoiceData = extractInvoiceData(fullTextractResponse); return { processedData: processScanData(invoiceData), originalResponse: fullTextractResponse }; } // Start SQS polling (call this when server starts) function startSQSPolling() { const pollInterval = setInterval(() => { processSQSMessages().catch(error => { console.error('SQS polling error:', error); }); }, 10000); // Poll every 10 seconds return pollInterval; } module.exports = { initializeBillOcr, handleBillOcr, handleBillOcrStatus, startSQSPolling };