const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand, AnalyzeExpenseCommand } = require("@aws-sdk/client-textract"); const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3"); const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs"); const { v4: uuidv4 } = require('uuid'); const { getTextractJobKey, setTextractJob, getTextractJob, getFileType, getPdfPageCount, hasActiveJobs } = require("./bill-ocr-helpers"); const { extractInvoiceData, processScanData } = require("./bill-ocr-normalize"); const { generateBillFormData } = require("./bill-ocr-generator"); const logger = require("../../utils/logger"); const _ = require("lodash"); // Initialize AWS clients const awsConfig = { region: process.env.AWS_AI_REGION || "ca-central-1", credentials: { accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY, } }; const textractClient = new TextractClient(awsConfig); const s3Client = new S3Client(awsConfig); const sqsClient = new SQSClient(awsConfig); let redisPubClient = null; /** * Initialize the bill-ocr module with Redis client * @param {Object} pubClient - Redis cluster client */ function initializeBillOcr(pubClient) { redisPubClient = pubClient; } /** * Check if job exists by Textract job ID * @param {string} textractJobId * @returns {Promise} */ async function jobExists(textractJobId) { if (!redisPubClient) { throw new Error('Redis client not initialized. Call initializeBillOcr first.'); } const key = getTextractJobKey(textractJobId); const exists = await redisPubClient.exists(key); if (exists) { return true; } return false; } async function handleBillOcr(req, res) { // Check if file was uploaded if (!req.file) { return res.status(400).send({ error: 'No file uploaded.' }); } // The uploaded file is available in request file const uploadedFile = req.file; const { jobid, bodyshopid, partsorderid } = req.body; logger.log("bill-ocr-start", "DEBUG", req.user.email, jobid, null); try { const fileType = getFileType(uploadedFile); // Images are always processed synchronously (single page) if (fileType === 'image') { const processedData = await processSinglePageDocument(uploadedFile.buffer); const billForm = await generateBillFormData({ processedData: processedData, jobid, bodyshopid, partsorderid, req: req }); logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, { ..._.omit(processedData, "originalTextractResponse"), billForm }); return res.status(200).json({ success: true, status: 'COMPLETED', data: { ...processedData, billForm }, message: 'Invoice processing completed' }); } else if (fileType === 'pdf') { // Check the number of pages in the PDF const pageCount = await getPdfPageCount(uploadedFile.buffer); if (pageCount === 1) { // Process synchronously for single-page documents const processedData = await processSinglePageDocument(uploadedFile.buffer); const billForm = await generateBillFormData({ processedData: processedData, jobid, bodyshopid, partsorderid, req: req }); logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, { ..._.omit(processedData, "originalTextractResponse"), billForm }); return res.status(200).json({ success: true, status: 'COMPLETED', data: { ...processedData, billForm }, message: 'Invoice processing completed' }); } // Start the Textract job (non-blocking) for multi-page documents const jobInfo = await startTextractJob(uploadedFile.buffer, { jobid, bodyshopid, partsorderid }); logger.log("bill-ocr-multipage-start", "DEBUG", req.user.email, jobid, jobInfo); return res.status(202).json({ success: true, textractJobId: jobInfo.jobId, message: 'Invoice processing started', statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}` }); } else { logger.log("bill-ocr-unsupported-filetype", "WARN", req.user.email, jobid, { fileType }); return res.status(400).json({ error: 'Unsupported file type', message: 'Please upload a PDF or supported image file (JPEG, PNG, TIFF)' }); } } catch (error) { logger.log("bill-ocr-error", "ERROR", req.user.email, jobid, { error: error.message, stack: error.stack }); return res.status(500).json({ error: 'Failed to start invoice processing', message: error.message }); } } async function handleBillOcrStatus(req, res) { const { textractJobId } = req.params; if (!textractJobId) { logger.log("bill-ocr-status-error", "WARN", req.user.email, null, { error: 'No textractJobId found in params' }); return res.status(400).json({ error: 'Job ID is required' }); } const jobStatus = await getTextractJob({ redisPubClient, textractJobId }); if (!jobStatus) { return res.status(404).json({ error: 'Job not found' }); } if (jobStatus.status === 'COMPLETED') { // Generate billForm on-demand if not already generated let billForm = jobStatus.data?.billForm; if (!billForm && jobStatus.context) { try { billForm = await generateBillFormData({ processedData: jobStatus.data, jobid: jobStatus.context.jobid, bodyshopid: jobStatus.context.bodyshopid, partsorderid: jobStatus.context.partsorderid, req: req // Now we have request context! }); logger.log("bill-ocr-multipage-complete", "DEBUG", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, billForm }); // Cache the billForm back to Redis for future requests await setTextractJob({ redisPubClient, textractJobId, jobData: { ...jobStatus, data: { ...jobStatus.data, billForm } } }); } catch (error) { logger.log("bill-ocr-multipage-error", "ERROR", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, error: error.message, stack: error.stack }); return res.status(500).send({ status: 'COMPLETED', error: 'Data processed but failed to generate bill form', message: error.message, data: jobStatus.data // Still return the raw processed data }); } } return res.status(200).send({ status: 'COMPLETED', data: { ...jobStatus.data, billForm } }); } else if (jobStatus.status === 'FAILED') { logger.log("bill-ocr-multipage-failed", "ERROR", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, error: jobStatus.error, }); return res.status(500).json({ status: 'FAILED', error: jobStatus.error }); } else { return res.status(200).json({ status: jobStatus.status }); } } /** * Process a single-page document synchronously using AnalyzeExpenseCommand * @param {Buffer} pdfBuffer * @returns {Promise} */ async function processSinglePageDocument(pdfBuffer) { const analyzeCommand = new AnalyzeExpenseCommand({ Document: { Bytes: pdfBuffer } }); const result = await textractClient.send(analyzeCommand); const invoiceData = extractInvoiceData(result); const processedData = processScanData(invoiceData); return { ...processedData, //Removed as this is a large object that provides minimal value to send to client. // originalTextractResponse: result }; } async function startTextractJob(pdfBuffer, context = {}) { // Upload PDF to S3 temporarily for Textract async processing const { bodyshopid, jobid } = context; const s3Bucket = process.env.AWS_AI_BUCKET; const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN; const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN; if (!s3Bucket) { throw new Error('AWS_AI_BUCKET environment variable is required'); } if (!snsTopicArn) { throw new Error('AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required'); } if (!snsRoleArn) { throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required'); } const uploadId = uuidv4(); const s3Key = `textract-temp/${bodyshopid}/${jobid}/${uploadId}.pdf`; //TODO Update Keys structure to something better. // Upload to S3 const uploadCommand = new PutObjectCommand({ Bucket: s3Bucket, Key: s3Key, Body: pdfBuffer, ContentType: 'application/pdf' //Hard coded - we only support PDFs for multi-page }); await s3Client.send(uploadCommand); // Start async Textract expense analysis with SNS notification const startCommand = new StartExpenseAnalysisCommand({ DocumentLocation: { S3Object: { Bucket: s3Bucket, Name: s3Key } }, NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: snsRoleArn }, ClientRequestToken: uploadId }); const startResult = await textractClient.send(startCommand); const textractJobId = startResult.JobId; // Store job info in Redis using textractJobId as the key await setTextractJob( { redisPubClient, textractJobId, jobData: { status: 'IN_PROGRESS', s3Key: s3Key, uploadId: uploadId, startedAt: new Date().toISOString(), context: context // Store the context for later use } } ); return { jobId: textractJobId }; } // Process SQS messages from Textract completion notifications async function processSQSMessages() { const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL; if (!queueUrl) { logger.log("bill-ocr-error", "ERROR", "api", null, { message: "AWS_TEXTRACT_SQS_QUEUE_URL not configured" }); return; } // Only poll if there are active mutli page jobs in progress const hasActive = await hasActiveJobs({ redisPubClient }); if (!hasActive) { return; } try { const receiveCommand = new ReceiveMessageCommand({ QueueUrl: queueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, MessageAttributeNames: ['All'] }); const result = await sqsClient.send(receiveCommand); if (result.Messages && result.Messages.length > 0) { logger.log("bill-ocr-sqs-processing", "DEBUG", "api", null, { message: `Processing ${result.Messages.length} messages from SQS` }); for (const message of result.Messages) { try { // Environment-level filtering: check if this message belongs to this environment const shouldProcess = await shouldProcessMessage(message); if (shouldProcess) { await handleTextractNotification(message); // Delete message after successful processing const deleteCommand = new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }); await sqsClient.send(deleteCommand); } } catch (error) { logger.log("bill-ocr-sqs-processing-error", "ERROR", "api", null, { message, error: error.message, stack: error.stack }); } } } } catch (error) { logger.log("bill-ocr-sqs-receiving-error", "ERROR", "api", null, { error: error.message, stack: error.stack }); } } /** * Check if a message should be processed by this environment * @param {Object} message - SQS message * @returns {Promise} */ async function shouldProcessMessage(message) { try { const body = JSON.parse(message.Body); const snsMessage = JSON.parse(body.Message); const textractJobId = snsMessage.JobId; // Check if job exists in Redis for this environment (using environment-specific prefix) const exists = await jobExists(textractJobId); return exists; } catch (error) { logger.log("bill-ocr-message-check-error", "DEBUG", "api", null, { message: "Error checking if message should be processed", error: error.message, stack: error.stack }); // If we can't parse the message, don't process it return false; } } async function handleTextractNotification(message) { const body = JSON.parse(message.Body); let snsMessage try { snsMessage = JSON.parse(body.Message); } catch (error) { logger.log("bill-ocr-handle-textract-error", "DEBUG", "api", null, { message: "Error parsing SNS message - invalid message format.", error: error.message, stack: error.stack, body }); return; } const textractJobId = snsMessage.JobId; const status = snsMessage.Status; // Get job info from Redis const jobInfo = await getTextractJob({ redisPubClient, textractJobId }); if (!jobInfo) { logger.log("bill-ocr-job-not-found", "DEBUG", "api", null, { message: `Job info not found in Redis for Textract job ID: ${textractJobId}`, textractJobId, snsMessage }); return; } if (status === 'SUCCEEDED') { // Retrieve the results const { processedData, originalResponse } = await retrieveTextractResults(textractJobId); // Store the processed data - billForm will be generated on-demand in the status endpoint await setTextractJob( { redisPubClient, textractJobId, jobData: { ...jobInfo, status: 'COMPLETED', data: { ...processedData, //Removed as this is a large object that provides minimal value to send to client. // originalTextractResponse: originalResponse }, completedAt: new Date().toISOString() } } ); } else if (status === 'FAILED') { await setTextractJob( { redisPubClient, textractJobId, jobData: { ...jobInfo, status: 'FAILED', error: snsMessage.StatusMessage || 'Textract job failed', completedAt: new Date().toISOString() } } ); } } async function retrieveTextractResults(textractJobId) { // Handle pagination if there are multiple pages of results let allExpenseDocuments = []; let nextToken = null; do { const getCommand = new GetExpenseAnalysisCommand({ JobId: textractJobId, NextToken: nextToken }); const result = await textractClient.send(getCommand); if (result.ExpenseDocuments) { allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments); } nextToken = result.NextToken; } while (nextToken); // Store the complete original response const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments }; // Extract invoice data from Textract response const invoiceData = extractInvoiceData(fullTextractResponse); return { processedData: processScanData(invoiceData), originalResponse: fullTextractResponse }; } // Start SQS polling (call this when server starts) function startSQSPolling() { const pollInterval = setInterval(() => { processSQSMessages().catch(error => { logger.log("bill-ocr-sqs-poll-error", "ERROR", "api", null, { message: error.message, stack: error.stack }); }); }, 10000); // Poll every 10 seconds return pollInterval; } module.exports = { initializeBillOcr, handleBillOcr, handleBillOcrStatus, startSQSPolling };