516 lines
15 KiB
JavaScript
516 lines
15 KiB
JavaScript
const {
|
|
TextractClient,
|
|
StartExpenseAnalysisCommand,
|
|
GetExpenseAnalysisCommand,
|
|
AnalyzeExpenseCommand
|
|
} = require("@aws-sdk/client-textract");
|
|
const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3");
|
|
const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
|
|
const { v4: uuidv4 } = require("uuid");
|
|
const {
|
|
getTextractJobKey,
|
|
setTextractJob,
|
|
getTextractJob,
|
|
getFileType,
|
|
getPdfPageCount,
|
|
hasActiveJobs
|
|
} = require("./bill-ocr-helpers");
|
|
const { extractInvoiceData, processScanData } = require("./bill-ocr-normalize");
|
|
const { generateBillFormData } = require("./bill-ocr-generator");
|
|
const logger = require("../../utils/logger");
|
|
const _ = require("lodash");
|
|
|
|
// Initialize AWS clients
|
|
const awsConfig = {
|
|
region: process.env.AWS_AI_REGION || "ca-central-1",
|
|
credentials: {
|
|
accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID,
|
|
secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY
|
|
}
|
|
};
|
|
|
|
const textractClient = new TextractClient(awsConfig);
|
|
const s3Client = new S3Client(awsConfig);
|
|
const sqsClient = new SQSClient(awsConfig);
|
|
|
|
let redisPubClient = null;
|
|
|
|
/**
|
|
* Initialize the bill-ocr module with Redis client
|
|
* @param {Object} pubClient - Redis cluster client
|
|
*/
|
|
function initializeBillOcr(pubClient) {
|
|
redisPubClient = pubClient;
|
|
}
|
|
|
|
/**
|
|
* Check if job exists by Textract job ID
|
|
* @param {string} textractJobId
|
|
* @returns {Promise<boolean>}
|
|
*/
|
|
async function jobExists(textractJobId) {
|
|
if (!redisPubClient) {
|
|
throw new Error("Redis client not initialized. Call initializeBillOcr first.");
|
|
}
|
|
const key = getTextractJobKey(textractJobId);
|
|
const exists = await redisPubClient.exists(key);
|
|
|
|
if (exists) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
async function handleBillOcr(req, res) {
|
|
// Check if file was uploaded
|
|
if (!req.file) {
|
|
return res.status(400).send({ error: "No file uploaded." });
|
|
}
|
|
|
|
// The uploaded file is available in request file
|
|
const uploadedFile = req.file;
|
|
const { jobid, bodyshopid, partsorderid } = req.body;
|
|
logger.log("bill-ocr-start", "DEBUG", req.user.email, jobid, null);
|
|
|
|
try {
|
|
const fileType = getFileType(uploadedFile);
|
|
// Images are always processed synchronously (single page)
|
|
if (fileType === "image") {
|
|
const processedData = await processSinglePageDocument(uploadedFile.buffer);
|
|
const billForm = await generateBillFormData({
|
|
processedData: processedData,
|
|
jobid,
|
|
bodyshopid,
|
|
partsorderid,
|
|
req: req
|
|
});
|
|
logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, {
|
|
..._.omit(processedData, "originalTextractResponse"),
|
|
billForm
|
|
});
|
|
|
|
return res.status(200).json({
|
|
success: true,
|
|
status: "COMPLETED",
|
|
data: { ...processedData, billForm },
|
|
message: "Invoice processing completed"
|
|
});
|
|
} else if (fileType === "pdf") {
|
|
// Check the number of pages in the PDF
|
|
const pageCount = await getPdfPageCount(uploadedFile.buffer);
|
|
|
|
if (pageCount === 1) {
|
|
// Process synchronously for single-page documents
|
|
const processedData = await processSinglePageDocument(uploadedFile.buffer);
|
|
const billForm = await generateBillFormData({
|
|
processedData: processedData,
|
|
jobid,
|
|
bodyshopid,
|
|
partsorderid,
|
|
req: req
|
|
});
|
|
logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, {
|
|
..._.omit(processedData, "originalTextractResponse"),
|
|
billForm
|
|
});
|
|
return res.status(200).json({
|
|
success: true,
|
|
status: "COMPLETED",
|
|
data: { ...processedData, billForm },
|
|
message: "Invoice processing completed"
|
|
});
|
|
}
|
|
// Start the Textract job (non-blocking) for multi-page documents
|
|
const jobInfo = await startTextractJob(uploadedFile.buffer, { jobid, bodyshopid, partsorderid });
|
|
logger.log("bill-ocr-multipage-start", "DEBUG", req.user.email, jobid, jobInfo);
|
|
|
|
return res.status(202).json({
|
|
success: true,
|
|
textractJobId: jobInfo.jobId,
|
|
message: "Invoice processing started",
|
|
statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}`
|
|
});
|
|
} else {
|
|
logger.log("bill-ocr-unsupported-filetype", "WARN", req.user.email, jobid, { fileType });
|
|
|
|
return res.status(400).json({
|
|
error: "Unsupported file type",
|
|
message: "Please upload a PDF or supported image file (JPEG, PNG, TIFF)"
|
|
});
|
|
}
|
|
} catch (error) {
|
|
logger.log("bill-ocr-error", "ERROR", req.user.email, jobid, { error: error.message, stack: error.stack });
|
|
return res.status(500).json({
|
|
error: "Failed to start invoice processing",
|
|
message: error.message
|
|
});
|
|
}
|
|
}
|
|
|
|
async function handleBillOcrStatus(req, res) {
|
|
const { textractJobId } = req.params;
|
|
|
|
if (!textractJobId) {
|
|
logger.log("bill-ocr-status-error", "WARN", req.user.email, null, { error: "No textractJobId found in params" });
|
|
return res.status(400).json({ error: "Job ID is required" });
|
|
}
|
|
const jobStatus = await getTextractJob({ redisPubClient, textractJobId });
|
|
|
|
if (!jobStatus) {
|
|
return res.status(404).json({ error: "Job not found" });
|
|
}
|
|
|
|
if (jobStatus.status === "COMPLETED") {
|
|
// Generate billForm on-demand if not already generated
|
|
let billForm = jobStatus.data?.billForm;
|
|
|
|
if (!billForm && jobStatus.context) {
|
|
try {
|
|
billForm = await generateBillFormData({
|
|
processedData: jobStatus.data,
|
|
jobid: jobStatus.context.jobid,
|
|
bodyshopid: jobStatus.context.bodyshopid,
|
|
partsorderid: jobStatus.context.partsorderid,
|
|
req: req // Now we have request context!
|
|
});
|
|
logger.log("bill-ocr-multipage-complete", "DEBUG", req.user.email, jobStatus.context.jobid, {
|
|
...jobStatus.data,
|
|
billForm
|
|
});
|
|
|
|
// Cache the billForm back to Redis for future requests
|
|
await setTextractJob({
|
|
redisPubClient,
|
|
textractJobId,
|
|
jobData: {
|
|
...jobStatus,
|
|
data: {
|
|
...jobStatus.data,
|
|
billForm
|
|
}
|
|
}
|
|
});
|
|
} catch (error) {
|
|
logger.log("bill-ocr-multipage-error", "ERROR", req.user.email, jobStatus.context.jobid, {
|
|
...jobStatus.data,
|
|
error: error.message,
|
|
stack: error.stack
|
|
});
|
|
|
|
return res.status(500).send({
|
|
status: "COMPLETED",
|
|
error: "Data processed but failed to generate bill form",
|
|
message: error.message,
|
|
data: jobStatus.data // Still return the raw processed data
|
|
});
|
|
}
|
|
}
|
|
|
|
return res.status(200).send({
|
|
status: "COMPLETED",
|
|
data: {
|
|
...jobStatus.data,
|
|
billForm
|
|
}
|
|
});
|
|
} else if (jobStatus.status === "FAILED") {
|
|
logger.log("bill-ocr-multipage-failed", "ERROR", req.user.email, jobStatus.context.jobid, {
|
|
...jobStatus.data,
|
|
error: jobStatus.error
|
|
});
|
|
|
|
return res.status(500).json({
|
|
status: "FAILED",
|
|
error: jobStatus.error
|
|
});
|
|
} else {
|
|
return res.status(200).json({
|
|
status: jobStatus.status
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process a single-page document synchronously using AnalyzeExpenseCommand
|
|
* @param {Buffer} pdfBuffer
|
|
* @returns {Promise<Object>}
|
|
*/
|
|
async function processSinglePageDocument(pdfBuffer) {
|
|
const analyzeCommand = new AnalyzeExpenseCommand({
|
|
Document: {
|
|
Bytes: pdfBuffer
|
|
}
|
|
});
|
|
|
|
const result = await textractClient.send(analyzeCommand);
|
|
const invoiceData = extractInvoiceData(result);
|
|
const processedData = processScanData(invoiceData);
|
|
|
|
return {
|
|
...processedData
|
|
//Removed as this is a large object that provides minimal value to send to client.
|
|
// originalTextractResponse: result
|
|
};
|
|
}
|
|
|
|
async function startTextractJob(pdfBuffer, context = {}) {
|
|
// Upload PDF to S3 temporarily for Textract async processing
|
|
const { bodyshopid, jobid } = context;
|
|
const s3Bucket = process.env.AWS_AI_BUCKET;
|
|
const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN;
|
|
const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN;
|
|
|
|
if (!s3Bucket) {
|
|
throw new Error("AWS_AI_BUCKET environment variable is required");
|
|
}
|
|
if (!snsTopicArn) {
|
|
throw new Error("AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required");
|
|
}
|
|
if (!snsRoleArn) {
|
|
throw new Error("AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required");
|
|
}
|
|
|
|
const uploadId = uuidv4();
|
|
const s3Key = `textract-temp/${bodyshopid}/${jobid}/${uploadId}.pdf`; //TODO Update Keys structure to something better.
|
|
|
|
// Upload to S3
|
|
const uploadCommand = new PutObjectCommand({
|
|
Bucket: s3Bucket,
|
|
Key: s3Key,
|
|
Body: pdfBuffer,
|
|
ContentType: "application/pdf" //Hard coded - we only support PDFs for multi-page
|
|
});
|
|
await s3Client.send(uploadCommand);
|
|
|
|
// Start async Textract expense analysis with SNS notification
|
|
const startCommand = new StartExpenseAnalysisCommand({
|
|
DocumentLocation: {
|
|
S3Object: {
|
|
Bucket: s3Bucket,
|
|
Name: s3Key
|
|
}
|
|
},
|
|
NotificationChannel: {
|
|
SNSTopicArn: snsTopicArn,
|
|
RoleArn: snsRoleArn
|
|
},
|
|
ClientRequestToken: uploadId
|
|
});
|
|
|
|
const startResult = await textractClient.send(startCommand);
|
|
const textractJobId = startResult.JobId;
|
|
|
|
// Store job info in Redis using textractJobId as the key
|
|
await setTextractJob({
|
|
redisPubClient,
|
|
textractJobId,
|
|
jobData: {
|
|
status: "IN_PROGRESS",
|
|
s3Key: s3Key,
|
|
uploadId: uploadId,
|
|
startedAt: new Date().toISOString(),
|
|
context: context // Store the context for later use
|
|
}
|
|
});
|
|
|
|
return {
|
|
jobId: textractJobId
|
|
};
|
|
}
|
|
|
|
// Process SQS messages from Textract completion notifications
|
|
async function processSQSMessages() {
|
|
const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL;
|
|
|
|
// Only poll if there are active mutli page jobs in progress
|
|
const hasActive = await hasActiveJobs({ redisPubClient });
|
|
if (!hasActive) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const receiveCommand = new ReceiveMessageCommand({
|
|
QueueUrl: queueUrl,
|
|
MaxNumberOfMessages: 10,
|
|
WaitTimeSeconds: 20,
|
|
MessageAttributeNames: ["All"]
|
|
});
|
|
|
|
const result = await sqsClient.send(receiveCommand);
|
|
|
|
if (result.Messages && result.Messages.length > 0) {
|
|
logger.log("bill-ocr-sqs-processing", "DEBUG", "api", null, {
|
|
message: `Processing ${result.Messages.length} messages from SQS`
|
|
});
|
|
for (const message of result.Messages) {
|
|
try {
|
|
// Environment-level filtering: check if this message belongs to this environment
|
|
const shouldProcess = await shouldProcessMessage(message);
|
|
|
|
if (shouldProcess) {
|
|
await handleTextractNotification(message);
|
|
// Delete message after successful processing
|
|
const deleteCommand = new DeleteMessageCommand({
|
|
QueueUrl: queueUrl,
|
|
ReceiptHandle: message.ReceiptHandle
|
|
});
|
|
await sqsClient.send(deleteCommand);
|
|
}
|
|
} catch (error) {
|
|
logger.log("bill-ocr-sqs-processing-error", "ERROR", "api", null, {
|
|
message,
|
|
error: error.message,
|
|
stack: error.stack
|
|
});
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.log("bill-ocr-sqs-receiving-error", "ERROR", "api", null, { error: error.message, stack: error.stack });
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if a message should be processed by this environment
|
|
* @param {Object} message - SQS message
|
|
* @returns {Promise<boolean>}
|
|
*/
|
|
async function shouldProcessMessage(message) {
|
|
try {
|
|
const body = JSON.parse(message.Body);
|
|
const snsMessage = JSON.parse(body.Message);
|
|
const textractJobId = snsMessage.JobId;
|
|
|
|
// Check if job exists in Redis for this environment (using environment-specific prefix)
|
|
const exists = await jobExists(textractJobId);
|
|
return exists;
|
|
} catch (error) {
|
|
logger.log("bill-ocr-message-check-error", "DEBUG", "api", null, {
|
|
message: "Error checking if message should be processed",
|
|
error: error.message,
|
|
stack: error.stack
|
|
});
|
|
// If we can't parse the message, don't process it
|
|
return false;
|
|
}
|
|
}
|
|
|
|
async function handleTextractNotification(message) {
|
|
const body = JSON.parse(message.Body);
|
|
let snsMessage;
|
|
try {
|
|
snsMessage = JSON.parse(body.Message);
|
|
} catch (error) {
|
|
logger.log("bill-ocr-handle-textract-error", "DEBUG", "api", null, {
|
|
message: "Error parsing SNS message - invalid message format.",
|
|
error: error.message,
|
|
stack: error.stack,
|
|
body
|
|
});
|
|
return;
|
|
}
|
|
|
|
const textractJobId = snsMessage.JobId;
|
|
const status = snsMessage.Status;
|
|
|
|
// Get job info from Redis
|
|
const jobInfo = await getTextractJob({ redisPubClient, textractJobId });
|
|
|
|
if (!jobInfo) {
|
|
logger.log("bill-ocr-job-not-found", "DEBUG", "api", null, {
|
|
message: `Job info not found in Redis for Textract job ID: ${textractJobId}`,
|
|
textractJobId,
|
|
snsMessage
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (status === "SUCCEEDED") {
|
|
// Retrieve the results
|
|
const { processedData, originalResponse } = await retrieveTextractResults(textractJobId);
|
|
|
|
// Store the processed data - billForm will be generated on-demand in the status endpoint
|
|
await setTextractJob({
|
|
redisPubClient,
|
|
textractJobId,
|
|
jobData: {
|
|
...jobInfo,
|
|
status: "COMPLETED",
|
|
data: {
|
|
...processedData
|
|
//Removed as this is a large object that provides minimal value to send to client.
|
|
// originalTextractResponse: originalResponse
|
|
},
|
|
completedAt: new Date().toISOString()
|
|
}
|
|
});
|
|
} else if (status === "FAILED") {
|
|
await setTextractJob({
|
|
redisPubClient,
|
|
textractJobId,
|
|
jobData: {
|
|
...jobInfo,
|
|
status: "FAILED",
|
|
error: snsMessage.StatusMessage || "Textract job failed",
|
|
completedAt: new Date().toISOString()
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
async function retrieveTextractResults(textractJobId) {
|
|
// Handle pagination if there are multiple pages of results
|
|
let allExpenseDocuments = [];
|
|
let nextToken = null;
|
|
|
|
do {
|
|
const getCommand = new GetExpenseAnalysisCommand({
|
|
JobId: textractJobId,
|
|
NextToken: nextToken
|
|
});
|
|
|
|
const result = await textractClient.send(getCommand);
|
|
|
|
if (result.ExpenseDocuments) {
|
|
allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments);
|
|
}
|
|
|
|
nextToken = result.NextToken;
|
|
} while (nextToken);
|
|
|
|
// Store the complete original response
|
|
const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments };
|
|
|
|
// Extract invoice data from Textract response
|
|
const invoiceData = extractInvoiceData(fullTextractResponse);
|
|
|
|
return {
|
|
processedData: processScanData(invoiceData),
|
|
originalResponse: fullTextractResponse
|
|
};
|
|
}
|
|
|
|
// Start SQS polling (call this when server starts)
|
|
function startSQSPolling() {
|
|
const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL;
|
|
|
|
if (!queueUrl) {
|
|
logger.log("bill-ocr-error", "ERROR", "api", null, { message: "AWS_TEXTRACT_SQS_QUEUE_URL not configured" });
|
|
return;
|
|
}
|
|
|
|
const pollInterval = setInterval(() => {
|
|
processSQSMessages().catch((error) => {
|
|
logger.log("bill-ocr-sqs-poll-error", "ERROR", "api", null, { message: error.message, stack: error.stack });
|
|
});
|
|
}, 10000); // Poll every 10 seconds
|
|
return pollInterval;
|
|
}
|
|
|
|
module.exports = {
|
|
initializeBillOcr,
|
|
handleBillOcr,
|
|
handleBillOcrStatus,
|
|
startSQSPolling
|
|
};
|