IO-3515 bifurcate single/multi page extract, add check for polling, add field labels

This commit is contained in:
Patrick Fic
2026-01-27 15:40:13 -08:00
parent 2a6d0446f0
commit ad7e85a578
3 changed files with 310 additions and 35 deletions

43
package-lock.json generated
View File

@@ -53,6 +53,7 @@
"mustache": "^4.2.0", "mustache": "^4.2.0",
"node-persist": "^4.0.4", "node-persist": "^4.0.4",
"nodemailer": "^6.10.0", "nodemailer": "^6.10.0",
"pdf-lib": "^1.17.1",
"phone": "^3.1.70", "phone": "^3.1.70",
"query-string": "7.1.3", "query-string": "7.1.3",
"recursive-diff": "^1.0.9", "recursive-diff": "^1.0.9",
@@ -2698,6 +2699,24 @@
"@noble/hashes": "^1.1.5" "@noble/hashes": "^1.1.5"
} }
}, },
"node_modules/@pdf-lib/standard-fonts": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/@pdf-lib/standard-fonts/-/standard-fonts-1.0.0.tgz",
"integrity": "sha512-hU30BK9IUN/su0Mn9VdlVKsWBS6GyhVfqjwl1FjZN4TxP6cCw0jP2w7V3Hf5uX7M0AZJ16vey9yE0ny7Sa59ZA==",
"license": "MIT",
"dependencies": {
"pako": "^1.0.6"
}
},
"node_modules/@pdf-lib/upng": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@pdf-lib/upng/-/upng-1.0.1.tgz",
"integrity": "sha512-dQK2FUMQtowVP00mtIksrlZhdFXQZPC+taih1q4CvPZ5vqdxR/LKBaFg0oAfzd1GlHZXXSPdQfzQnt+ViGvEIQ==",
"license": "MIT",
"dependencies": {
"pako": "^1.0.10"
}
},
"node_modules/@pkgjs/parseargs": { "node_modules/@pkgjs/parseargs": {
"version": "0.11.0", "version": "0.11.0",
"resolved": "https://registry.npmjs.org/@pkgjs/parseargs/-/parseargs-0.11.0.tgz", "resolved": "https://registry.npmjs.org/@pkgjs/parseargs/-/parseargs-0.11.0.tgz",
@@ -9312,6 +9331,12 @@
"integrity": "sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw==", "integrity": "sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw==",
"license": "BlueOak-1.0.0" "license": "BlueOak-1.0.0"
}, },
"node_modules/pako": {
"version": "1.0.11",
"resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz",
"integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==",
"license": "(MIT AND Zlib)"
},
"node_modules/parent-module": { "node_modules/parent-module": {
"version": "1.0.1", "version": "1.0.1",
"resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz",
@@ -9450,6 +9475,24 @@
"dev": true, "dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/pdf-lib": {
"version": "1.17.1",
"resolved": "https://registry.npmjs.org/pdf-lib/-/pdf-lib-1.17.1.tgz",
"integrity": "sha512-V/mpyJAoTsN4cnP31vc0wfNA1+p20evqqnap0KLoRUN0Yk/p3wN52DOEsL4oBFcLdb76hlpKPtzJIgo67j/XLw==",
"license": "MIT",
"dependencies": {
"@pdf-lib/standard-fonts": "^1.0.0",
"@pdf-lib/upng": "^1.0.1",
"pako": "^1.0.11",
"tslib": "^1.11.1"
}
},
"node_modules/pdf-lib/node_modules/tslib": {
"version": "1.14.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz",
"integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==",
"license": "0BSD"
},
"node_modules/phone": { "node_modules/phone": {
"version": "3.1.70", "version": "3.1.70",
"resolved": "https://registry.npmjs.org/phone/-/phone-3.1.70.tgz", "resolved": "https://registry.npmjs.org/phone/-/phone-3.1.70.tgz",

View File

@@ -62,6 +62,7 @@
"mustache": "^4.2.0", "mustache": "^4.2.0",
"node-persist": "^4.0.4", "node-persist": "^4.0.4",
"nodemailer": "^6.10.0", "nodemailer": "^6.10.0",
"pdf-lib": "^1.17.1",
"phone": "^3.1.70", "phone": "^3.1.70",
"query-string": "7.1.3", "query-string": "7.1.3",
"recursive-diff": "^1.0.9", "recursive-diff": "^1.0.9",

View File

@@ -1,7 +1,8 @@
const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand } = require("@aws-sdk/client-textract"); const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand, AnalyzeExpenseCommand } = require("@aws-sdk/client-textract");
const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3"); const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3");
const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs"); const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const { v4: uuidv4 } = require('uuid'); const { v4: uuidv4 } = require('uuid');
const PDFDocument = require('pdf-lib').PDFDocument;
// Initialize AWS clients // Initialize AWS clients
const awsConfig = { const awsConfig = {
@@ -18,6 +19,7 @@ const sqsClient = new SQSClient(awsConfig);
let redisPubClient = null; let redisPubClient = null;
const TEXTRACT_JOB_TTL = 3600; const TEXTRACT_JOB_TTL = 3600;
const MIN_CONFIDENCE_VALUE = 50
/** /**
* Initialize the bill-ocr module with Redis client * Initialize the bill-ocr module with Redis client
@@ -87,10 +89,46 @@ async function jobExists(textractJobId) {
return false; return false;
} }
/**
* Check if there are any jobs in IN_PROGRESS status
* @returns {Promise<boolean>}
*/
async function hasActiveJobs() {
if (!redisPubClient) {
throw new Error('Redis client not initialized.');
}
try {
// Get all textract job keys
const pattern = 'textract:job:*';
const keys = await redisPubClient.keys(pattern);
if (!keys || keys.length === 0) {
return false;
}
//TODO: Is there a better way to do this that supports clusters?
// Check if any job has IN_PROGRESS status
for (const key of keys) {
const data = await redisPubClient.get(key);
if (data) {
const jobData = JSON.parse(data);
if (jobData.status === 'IN_PROGRESS') {
return true;
}
}
}
return false;
} catch (error) {
console.error('Error checking for active jobs:', error);
return false;
}
}
async function handleBillOcr(request, response) { async function handleBillOcr(request, response) {
// Check if file was uploaded // Check if file was uploaded
if (!request.file) { if (!request.file) {
response.status(400).send({ error: 'No file uploaded' }); response.status(400).send({ error: 'No file uploaded.' });
return; return;
} }
@@ -98,15 +136,54 @@ async function handleBillOcr(request, response) {
const uploadedFile = request.file; const uploadedFile = request.file;
try { try {
// Start the Textract job (non-blocking) const fileType = getFileType(uploadedFile);
const jobInfo = await startTextractJob(uploadedFile.buffer); console.log(`Processing file type: ${fileType}`);
response.status(202).send({ // Images are always processed synchronously (single page)
success: true, if (fileType === 'image') {
jobId: jobInfo.jobId, console.log('Image => 1 page, processing synchronously');
message: 'Invoice processing started', const result = await processSinglePageDocument(uploadedFile.buffer);
statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}`
}); response.status(200).send({
success: true,
status: 'COMPLETED',
data: result,
message: 'Invoice processing completed'
});
} else if (fileType === 'pdf') {
// Check the number of pages in the PDF
const pageCount = await getPdfPageCount(uploadedFile.buffer);
console.log(`PDF has ${pageCount} page(s)`);
if (pageCount === 1) {
// Process synchronously for single-page documents
console.log('PDF => 1 page, processing synchronously');
const result = await processSinglePageDocument(uploadedFile.buffer);
response.status(200).send({
success: true,
status: 'COMPLETED',
data: result,
message: 'Invoice processing completed'
});
} else {
// Start the Textract job (non-blocking) for multi-page documents
console.log('PDF => 2+ pages, processing asynchronously');
const jobInfo = await startTextractJob(uploadedFile.buffer);
response.status(202).send({
success: true,
jobId: jobInfo.jobId,
message: 'Invoice processing started',
statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}`
});
}
} else {
response.status(400).send({
error: 'Unsupported file type',
message: 'Please upload a PDF or supported image file (JPEG, PNG, TIFF)'
});
}
} catch (error) { } catch (error) {
console.error('Error starting invoice processing:', error); console.error('Error starting invoice processing:', error);
response.status(500).send({ response.status(500).send({
@@ -160,6 +237,95 @@ async function handleBillOcrStatus(request, response) {
} }
} }
/**
* Detect file type based on MIME type and file signature
* @param {Object} file - Multer file object
* @returns {string} 'pdf', 'image', or 'unknown'
*/
function getFileType(file) {
// Check MIME type first
const mimeType = file.mimetype?.toLowerCase();
if (mimeType === 'application/pdf') {
return 'pdf';
}
if (mimeType && mimeType.startsWith('image/')) {
return 'image';
}
// Fallback: Check file signature (magic bytes)
const buffer = file.buffer;
if (buffer && buffer.length > 4) {
// PDF signature: %PDF
if (buffer[0] === 0x25 && buffer[1] === 0x50 && buffer[2] === 0x44 && buffer[3] === 0x46) {
return 'pdf';
}
// JPEG signature: FF D8 FF
if (buffer[0] === 0xFF && buffer[1] === 0xD8 && buffer[2] === 0xFF) {
return 'image';
}
// PNG signature: 89 50 4E 47
if (buffer[0] === 0x89 && buffer[1] === 0x50 && buffer[2] === 0x4E && buffer[3] === 0x47) {
return 'image';
}
// HEIC/HEIF: Check for ftyp followed by heic/heix/hevc/hevx
if (buffer.length > 12) {
const ftypIndex = buffer.indexOf(Buffer.from('ftyp'));
if (ftypIndex > 0 && ftypIndex < 12) {
const brand = buffer.slice(ftypIndex + 4, ftypIndex + 8).toString('ascii');
if (brand.startsWith('heic') || brand.startsWith('heix') ||
brand.startsWith('hevc') || brand.startsWith('hevx') ||
brand.startsWith('mif1')) {
return 'image';
}
}
}
}
return 'unknown';
}
/**
* Get the number of pages in a PDF buffer
* @param {Buffer} pdfBuffer
* @returns {Promise<number>}
*/
async function getPdfPageCount(pdfBuffer) {
try {
const pdfDoc = await PDFDocument.load(pdfBuffer);
return pdfDoc.getPageCount();
} catch (error) {
console.error('Error reading PDF page count:', error);
throw new Error('Failed to read PDF: ' + error.message);
}
}
/**
* Process a single-page document synchronously using AnalyzeExpenseCommand
* @param {Buffer} pdfBuffer
* @returns {Promise<Object>}
*/
async function processSinglePageDocument(pdfBuffer) {
const analyzeCommand = new AnalyzeExpenseCommand({
Document: {
Bytes: pdfBuffer
}
});
const result = await textractClient.send(analyzeCommand);
const invoiceData = extractInvoiceData(result);
const processedData = processScanData(invoiceData);
return {
...processedData,
originalTextractResponse: result
};
}
async function startTextractJob(pdfBuffer) { async function startTextractJob(pdfBuffer) {
// Upload PDF to S3 temporarily for Textract async processing // Upload PDF to S3 temporarily for Textract async processing
const s3Bucket = process.env.AWS_AI_BUCKET; const s3Bucket = process.env.AWS_AI_BUCKET;
@@ -177,14 +343,14 @@ async function startTextractJob(pdfBuffer) {
} }
const uploadId = uuidv4(); const uploadId = uuidv4();
const s3Key = `textract-temp/${uploadId}.pdf`; const s3Key = `textract-temp/${uploadId}.pdf`; //TODO Update Keys structure to something better.
// Upload to S3 // Upload to S3
const uploadCommand = new PutObjectCommand({ const uploadCommand = new PutObjectCommand({
Bucket: s3Bucket, Bucket: s3Bucket,
Key: s3Key, Key: s3Key,
Body: pdfBuffer, Body: pdfBuffer,
ContentType: 'application/pdf' ContentType: 'application/pdf' //Hard coded - we only support PDFs for multi-page
}); });
await s3Client.send(uploadCommand); await s3Client.send(uploadCommand);
@@ -228,6 +394,13 @@ async function processSQSMessages() {
return; return;
} }
// Only poll if there are active mutli page jobs in progress
const hasActive = await hasActiveJobs();
if (!hasActive) {
console.log('No active jobs in progress, skipping SQS poll');
return;
}
try { try {
console.log('Polling SQS queue:', queueUrl); console.log('Polling SQS queue:', queueUrl);
const receiveCommand = new ReceiveMessageCommand({ const receiveCommand = new ReceiveMessageCommand({
@@ -267,7 +440,6 @@ async function handleTextractNotification(message) {
const body = JSON.parse(message.Body); const body = JSON.parse(message.Body);
let snsMessage let snsMessage
try { try {
snsMessage = JSON.parse(body.Message); snsMessage = JSON.parse(body.Message);
} catch (error) { } catch (error) {
//Delete the message so it doesn't clog the queue //Delete the message so it doesn't clog the queue
@@ -296,13 +468,15 @@ async function handleTextractNotification(message) {
if (status === 'SUCCEEDED') { if (status === 'SUCCEEDED') {
// Retrieve the results // Retrieve the results
const invoiceData = await retrieveTextractResults(textractJobId); const { processedData, originalResponse } = await retrieveTextractResults(textractJobId);
const processedData = processScanData(invoiceData);
await setTextractJob(textractJobId, { await setTextractJob(textractJobId, {
...jobInfo, ...jobInfo,
status: 'COMPLETED', status: 'COMPLETED',
data: processedData, data: {
...processedData,
originalTextractResponse: originalResponse
},
completedAt: new Date().toISOString() completedAt: new Date().toISOString()
}); });
} else if (status === 'FAILED') { } else if (status === 'FAILED') {
@@ -335,8 +509,16 @@ async function retrieveTextractResults(textractJobId) {
nextToken = result.NextToken; nextToken = result.NextToken;
} while (nextToken); } while (nextToken);
// Store the complete original response
const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments };
// Extract invoice data from Textract response // Extract invoice data from Textract response
return extractInvoiceData({ ExpenseDocuments: allExpenseDocuments }); const invoiceData = extractInvoiceData(fullTextractResponse);
return {
processedData: processScanData(invoiceData),
originalResponse: fullTextractResponse
};
} }
// Start SQS polling (call this when server starts) // Start SQS polling (call this when server starts)
@@ -346,7 +528,6 @@ function startSQSPolling() {
console.error('SQS polling error:', error); console.error('SQS polling error:', error);
}); });
}, 10000); // Poll every 10 seconds }, 10000); // Poll every 10 seconds
return pollInterval; return pollInterval;
} }
@@ -367,12 +548,15 @@ function extractInvoiceData(textractResponse) {
expenseDoc.SummaryFields.forEach(field => { expenseDoc.SummaryFields.forEach(field => {
const fieldType = field.Type?.Text || ''; const fieldType = field.Type?.Text || '';
const fieldValue = field.ValueDetection?.Text || ''; const fieldValue = field.ValueDetection?.Text || '';
const fieldLabel = field.LabelDetection?.Text || '';
const confidence = field.ValueDetection?.Confidence || 0; const confidence = field.ValueDetection?.Confidence || 0;
// Map common invoice fields // Map common invoice fields
if (fieldType && fieldValue) { if (fieldType && fieldValue) {
invoiceData.summary[fieldType] = { invoiceData.summary[fieldType] = {
value: fieldValue, value: fieldValue,
label: fieldLabel,
normalizedLabel: normalizeLabelName(fieldLabel),
confidence: confidence confidence: confidence
}; };
} }
@@ -390,6 +574,7 @@ function extractInvoiceData(textractResponse) {
lineItem.LineItemExpenseFields.forEach(field => { lineItem.LineItemExpenseFields.forEach(field => {
const fieldType = field.Type?.Text || ''; const fieldType = field.Type?.Text || '';
const fieldValue = field.ValueDetection?.Text || ''; const fieldValue = field.ValueDetection?.Text || '';
const fieldLabel = field.LabelDetection?.Text || '';
const confidence = field.ValueDetection?.Confidence || 0; const confidence = field.ValueDetection?.Confidence || 0;
if (fieldType && fieldValue) { if (fieldType && fieldValue) {
@@ -397,6 +582,8 @@ function extractInvoiceData(textractResponse) {
const normalizedField = normalizeFieldName(fieldType); const normalizedField = normalizeFieldName(fieldType);
item[normalizedField] = { item[normalizedField] = {
value: fieldValue, value: fieldValue,
label: fieldLabel,
normalizedLabel: normalizeLabelName(fieldLabel),
confidence: confidence confidence: confidence
}; };
} }
@@ -417,16 +604,50 @@ function extractInvoiceData(textractResponse) {
function normalizeFieldName(fieldType) { function normalizeFieldName(fieldType) {
//Placeholder normalization for now. //Placeholder normalization for now.
const fieldMap = { return fieldType;
'ITEM': 'description', }
'QUANTITY': 'quantity',
'UNIT_PRICE': 'unitPrice', function normalizeLabelName(labelText) {
'PRICE': 'price', if (!labelText) return '';
'PRODUCT_CODE': 'productCode',
'EXPENSE_ROW': 'row' // Convert to lowercase and trim whitespace
let normalized = labelText.toLowerCase().trim();
// Remove special characters and replace spaces with underscores
normalized = normalized.replace(/[^a-z0-9\s]/g, '').replace(/\s+/g, '_');
const standardizedFieldsnames = {
actual_cost: "actual_cost",
actual_price: "actual_price",
line_desc: "line_desc",
quantity: "quantity",
part_no: "part_no"
}
// Common label normalizations
const labelMap = {
'qty': standardizedFieldsnames.quantity,
'qnty': standardizedFieldsnames.quantity,
'sale_qty': standardizedFieldsnames.quantity,
'quant': standardizedFieldsnames.quantity,
'desc': standardizedFieldsnames.line_desc,
'description': standardizedFieldsnames.line_desc,
'item': standardizedFieldsnames.line_desc,
'part': standardizedFieldsnames.part_no,
'part_no': standardizedFieldsnames.part_no,
'part_num': standardizedFieldsnames.part_no,
'part_number': standardizedFieldsnames.part_no,
'price': standardizedFieldsnames.actual_price,
'unit_price': standardizedFieldsnames.actual_price,
'amount': standardizedFieldsnames.actual_price,
'list_price': standardizedFieldsnames.actual_price,
'list': standardizedFieldsnames.actual_price,
'retail_price': standardizedFieldsnames.actual_price,
'net': standardizedFieldsnames.actual_cost,
'selling_price': standardizedFieldsnames.actual_cost,
}; };
return fieldMap[fieldType] || fieldType.toLowerCase().replace(/_/g, ''); return labelMap[normalized] || normalized; // TODO: Should we monitor unmapped labels?
} }
function processScanData(invoiceData) { function processScanData(invoiceData) {
@@ -438,8 +659,13 @@ function processScanData(invoiceData) {
// Clean summary fields // Clean summary fields
for (const [key, value] of Object.entries(invoiceData.summary)) { for (const [key, value] of Object.entries(invoiceData.summary)) {
if (value.confidence > 50) { // Only include fields with > 50% confidence if (value.confidence > MIN_CONFIDENCE_VALUE) { // Only include fields with > 50% confidence
processed.summary[key] = value.value; processed.summary[key] = {
value: value.value,
label: value.label,
normalizedLabel: value.normalizedLabel,
confidence: value.confidence
};
} }
} }
@@ -449,27 +675,32 @@ function processScanData(invoiceData) {
const processedItem = {}; const processedItem = {};
for (const [key, value] of Object.entries(item)) { for (const [key, value] of Object.entries(item)) {
if (value.confidence > 50) { // Only include fields with > 50% confidence if (value.confidence > MIN_CONFIDENCE_VALUE) { // Only include fields with > 50% confidence
let cleanValue = value.value; let cleanValue = value.value;
// Parse numbers for quantity and price fields // Parse numbers for quantity and price fields
if (key === 'quantity') { if (key === 'quantity') {
cleanValue = parseFloat(cleanValue) || 0; cleanValue = parseFloat(cleanValue) || 0;
} else if (key === 'unitPrice' || key === 'price') { } else if (key === 'retail_price' || key === 'actual_price') {
// Remove currency symbols and parse // Remove currency symbols and parse
cleanValue = parseFloat(cleanValue.replace(/[^0-9.-]/g, '')) || 0; cleanValue = parseFloat(cleanValue.replace(/[^0-9.-]/g, '')) || 0;
} }
processedItem[key] = cleanValue; processedItem[key] = {
value: cleanValue,
label: value.label,
normalizedLabel: value.normalizedLabel,
confidence: value.confidence
};
} }
} }
return processedItem; return processedItem;
}) })
.filter(item => { // .filter(item => {
// Filter out items with no description or with quantity <= 0 // // Filter out items with no description or with quantity <= 0
return item.description && (!item.quantity || item.quantity > 0); // return item.description && (!item.quantity || item.quantity > 0);
}); // });
return processed; return processed;
} }