Files
bodyshop/server/ai/bill-ocr/bill-ocr.js
2026-02-20 09:06:11 -08:00

465 lines
16 KiB
JavaScript

const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand, AnalyzeExpenseCommand } = require("@aws-sdk/client-textract");
const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3");
const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const { v4: uuidv4 } = require('uuid');
const { getTextractJobKey, setTextractJob, getTextractJob, getFileType, getPdfPageCount, hasActiveJobs } = require("./bill-ocr-helpers");
const { extractInvoiceData, processScanData } = require("./bill-ocr-normalize");
const { generateBillFormData } = require("./bill-ocr-generator");
const logger = require("../../utils/logger");
// Initialize AWS clients
const awsConfig = {
region: process.env.AWS_AI_REGION || "ca-central-1",
credentials: {
accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY,
}
};
const textractClient = new TextractClient(awsConfig);
const s3Client = new S3Client(awsConfig);
const sqsClient = new SQSClient(awsConfig);
let redisPubClient = null;
/**
* Initialize the bill-ocr module with Redis client
* @param {Object} pubClient - Redis cluster client
*/
function initializeBillOcr(pubClient) {
redisPubClient = pubClient;
}
/**
* Check if job exists by Textract job ID
* @param {string} textractJobId
* @returns {Promise<boolean>}
*/
async function jobExists(textractJobId) {
if (!redisPubClient) {
throw new Error('Redis client not initialized. Call initializeBillOcr first.');
}
const key = getTextractJobKey(textractJobId);
const exists = await redisPubClient.exists(key);
if (exists) {
return true;
}
return false;
}
async function handleBillOcr(req, res) {
// Check if file was uploaded
if (!req.file) {
return res.status(400).send({ error: 'No file uploaded.' });
}
// The uploaded file is available in request file
const uploadedFile = req.file;
const { jobid, bodyshopid, partsorderid } = req.body;
logger.log("bill-ocr-start", "DEBUG", req.user.email, jobid, null);
try {
const fileType = getFileType(uploadedFile);
// Images are always processed synchronously (single page)
if (fileType === 'image') {
const processedData = await processSinglePageDocument(uploadedFile.buffer);
const billForm = await generateBillFormData({ processedData: processedData, jobid, bodyshopid, partsorderid, req: req });
logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, { ...processedData, billForm });
return res.status(200).json({
success: true,
status: 'COMPLETED',
data: { ...processedData, billForm },
message: 'Invoice processing completed'
});
} else if (fileType === 'pdf') {
// Check the number of pages in the PDF
const pageCount = await getPdfPageCount(uploadedFile.buffer);
if (pageCount === 1) {
// Process synchronously for single-page documents
const processedData = await processSinglePageDocument(uploadedFile.buffer);
const billForm = await generateBillFormData({ processedData: processedData, jobid, bodyshopid, partsorderid, req: req });
logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, { ...processedData, billForm });
return res.status(200).json({
success: true,
status: 'COMPLETED',
data: { ...processedData, billForm },
message: 'Invoice processing completed'
});
}
// Start the Textract job (non-blocking) for multi-page documents
const jobInfo = await startTextractJob(uploadedFile.buffer, { jobid, bodyshopid, partsorderid });
logger.log("bill-ocr-multipage-start", "DEBUG", req.user.email, jobid, jobInfo);
return res.status(202).json({
success: true,
textractJobId: jobInfo.jobId,
message: 'Invoice processing started',
statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}`
});
} else {
logger.log("bill-ocr-unsupported-filetype", "WARN", req.user.email, jobid, { fileType });
return res.status(400).json({
error: 'Unsupported file type',
message: 'Please upload a PDF or supported image file (JPEG, PNG, TIFF)'
});
}
} catch (error) {
logger.log("bill-ocr-error", "ERROR", req.user.email, jobid, { error: error.message, stack: error.stack });
return res.status(500).json({
error: 'Failed to start invoice processing',
message: error.message
});
}
}
async function handleBillOcrStatus(req, res) {
const { textractJobId } = req.params;
if (!textractJobId) {
logger.log("bill-ocr-status-error", "WARN", req.user.email, null, { error: 'No textractJobId found in params' });
return res.status(400).json({ error: 'Job ID is required' });
}
const jobStatus = await getTextractJob({ redisPubClient, textractJobId });
if (!jobStatus) {
return res.status(404).json({ error: 'Job not found' });
}
if (jobStatus.status === 'COMPLETED') {
// Generate billForm on-demand if not already generated
let billForm = jobStatus.data?.billForm;
if (!billForm && jobStatus.context) {
try {
billForm = await generateBillFormData({
processedData: jobStatus.data,
jobid: jobStatus.context.jobid,
bodyshopid: jobStatus.context.bodyshopid,
partsorderid: jobStatus.context.partsorderid,
req: req // Now we have request context!
});
logger.log("bill-ocr-multipage-complete", "DEBUG", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, billForm });
// Cache the billForm back to Redis for future requests
await setTextractJob({
redisPubClient,
textractJobId,
jobData: {
...jobStatus,
data: {
...jobStatus.data,
billForm
}
}
});
} catch (error) {
logger.log("bill-ocr-multipage-error", "ERROR", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, error: error.message, stack: error.stack });
return res.status(500).send({
status: 'COMPLETED',
error: 'Data processed but failed to generate bill form',
message: error.message,
data: jobStatus.data // Still return the raw processed data
});
}
}
return res.status(200).send({
status: 'COMPLETED',
data: {
...jobStatus.data,
billForm
}
});
} else if (jobStatus.status === 'FAILED') {
logger.log("bill-ocr-multipage-failed", "ERROR", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, error: jobStatus.error, });
return res.status(500).json({
status: 'FAILED',
error: jobStatus.error
});
} else {
return res.status(200).json({
status: jobStatus.status
});
}
}
/**
* Process a single-page document synchronously using AnalyzeExpenseCommand
* @param {Buffer} pdfBuffer
* @returns {Promise<Object>}
*/
async function processSinglePageDocument(pdfBuffer) {
const analyzeCommand = new AnalyzeExpenseCommand({
Document: {
Bytes: pdfBuffer
}
});
const result = await textractClient.send(analyzeCommand);
const invoiceData = extractInvoiceData(result);
const processedData = processScanData(invoiceData);
return {
...processedData,
originalTextractResponse: result
};
}
async function startTextractJob(pdfBuffer, context = {}) {
// Upload PDF to S3 temporarily for Textract async processing
const { bodyshopid, jobid } = context;
const s3Bucket = process.env.AWS_AI_BUCKET;
const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN;
const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN;
if (!s3Bucket) {
throw new Error('AWS_AI_BUCKET environment variable is required');
}
if (!snsTopicArn) {
throw new Error('AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required');
}
if (!snsRoleArn) {
throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required');
}
const uploadId = uuidv4();
const s3Key = `textract-temp/${bodyshopid}/${jobid}/${uploadId}.pdf`; //TODO Update Keys structure to something better.
// Upload to S3
const uploadCommand = new PutObjectCommand({
Bucket: s3Bucket,
Key: s3Key,
Body: pdfBuffer,
ContentType: 'application/pdf' //Hard coded - we only support PDFs for multi-page
});
await s3Client.send(uploadCommand);
// Start async Textract expense analysis with SNS notification
const startCommand = new StartExpenseAnalysisCommand({
DocumentLocation: {
S3Object: {
Bucket: s3Bucket,
Name: s3Key
}
},
NotificationChannel: {
SNSTopicArn: snsTopicArn,
RoleArn: snsRoleArn
},
ClientRequestToken: uploadId
});
const startResult = await textractClient.send(startCommand);
const textractJobId = startResult.JobId;
// Store job info in Redis using textractJobId as the key
await setTextractJob(
{
redisPubClient,
textractJobId,
jobData: {
status: 'IN_PROGRESS',
s3Key: s3Key,
uploadId: uploadId,
startedAt: new Date().toISOString(),
context: context // Store the context for later use
}
}
);
return {
jobId: textractJobId
};
}
// Process SQS messages from Textract completion notifications
async function processSQSMessages() {
const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL;
if (!queueUrl) {
logger.log("bill-ocr-error", "ERROR", "api", null, { message: "AWS_TEXTRACT_SQS_QUEUE_URL not configured" });
return;
}
// Only poll if there are active mutli page jobs in progress
const hasActive = await hasActiveJobs({ redisPubClient });
if (!hasActive) {
return;
}
try {
const receiveCommand = new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20,
MessageAttributeNames: ['All']
});
const result = await sqsClient.send(receiveCommand);
if (result.Messages && result.Messages.length > 0) {
logger.log("bill-ocr-sqs-processing", "DEBUG", "api", null, { message: `Processing ${result.Messages.length} messages from SQS` });
for (const message of result.Messages) {
try {
// Environment-level filtering: check if this message belongs to this environment
const shouldProcess = await shouldProcessMessage(message);
if (shouldProcess) {
await handleTextractNotification(message);
// Delete message after successful processing
const deleteCommand = new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
});
await sqsClient.send(deleteCommand);
}
} catch (error) {
logger.log("bill-ocr-sqs-processing-error", "ERROR", "api", null, { message, error: error.message, stack: error.stack });
}
}
}
} catch (error) {
logger.log("bill-ocr-sqs-receiving-error", "ERROR", "api", null, { error: error.message, stack: error.stack });
}
}
/**
* Check if a message should be processed by this environment
* @param {Object} message - SQS message
* @returns {Promise<boolean>}
*/
async function shouldProcessMessage(message) {
try {
const body = JSON.parse(message.Body);
const snsMessage = JSON.parse(body.Message);
const textractJobId = snsMessage.JobId;
// Check if job exists in Redis for this environment (using environment-specific prefix)
const exists = await jobExists(textractJobId);
return exists;
} catch (error) {
logger.log("bill-ocr-message-check-error", "DEBUG", "api", null, { message: "Error checking if message should be processed", error: error.message, stack: error.stack });
// If we can't parse the message, don't process it
return false;
}
}
async function handleTextractNotification(message) {
const body = JSON.parse(message.Body);
let snsMessage
try {
snsMessage = JSON.parse(body.Message);
} catch (error) {
logger.log("bill-ocr-handle-textract-error", "DEBUG", "api", null, { message: "Error parsing SNS message - invalid message format.", error: error.message, stack: error.stack, body });
return;
}
const textractJobId = snsMessage.JobId;
const status = snsMessage.Status;
// Get job info from Redis
const jobInfo = await getTextractJob({ redisPubClient, textractJobId });
if (!jobInfo) {
logger.log("bill-ocr-job-not-found", "DEBUG", "api", null, { message: `Job info not found in Redis for Textract job ID: ${textractJobId}`, textractJobId, snsMessage });
return;
}
if (status === 'SUCCEEDED') {
// Retrieve the results
const { processedData, originalResponse } = await retrieveTextractResults(textractJobId);
// Store the processed data - billForm will be generated on-demand in the status endpoint
await setTextractJob(
{
redisPubClient,
textractJobId,
jobData: {
...jobInfo,
status: 'COMPLETED',
data: {
...processedData,
originalTextractResponse: originalResponse
},
completedAt: new Date().toISOString()
}
}
);
} else if (status === 'FAILED') {
await setTextractJob(
{
redisPubClient,
textractJobId,
jobData: {
...jobInfo,
status: 'FAILED',
error: snsMessage.StatusMessage || 'Textract job failed',
completedAt: new Date().toISOString()
}
}
);
}
}
async function retrieveTextractResults(textractJobId) {
// Handle pagination if there are multiple pages of results
let allExpenseDocuments = [];
let nextToken = null;
do {
const getCommand = new GetExpenseAnalysisCommand({
JobId: textractJobId,
NextToken: nextToken
});
const result = await textractClient.send(getCommand);
if (result.ExpenseDocuments) {
allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments);
}
nextToken = result.NextToken;
} while (nextToken);
// Store the complete original response
const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments };
// Extract invoice data from Textract response
const invoiceData = extractInvoiceData(fullTextractResponse);
return {
processedData: processScanData(invoiceData),
originalResponse: fullTextractResponse
};
}
// Start SQS polling (call this when server starts)
function startSQSPolling() {
const pollInterval = setInterval(() => {
processSQSMessages().catch(error => {
logger.log("bill-ocr-sqs-poll-error", "ERROR", "api", null, { message: error.message, stack: error.stack });
});
}, 10000); // Poll every 10 seconds
return pollInterval;
}
module.exports = {
initializeBillOcr,
handleBillOcr,
handleBillOcrStatus,
startSQSPolling
};