From ad7e85a5788b09eb1b6f0a191e1dc07e62542446 Mon Sep 17 00:00:00 2001 From: Patrick Fic Date: Tue, 27 Jan 2026 15:40:13 -0800 Subject: [PATCH] IO-3515 bifurcate single/multi page extract, add check for polling, add field labels --- package-lock.json | 43 +++++ package.json | 1 + server/ai/bill-ocr/bill-ocr.js | 301 +++++++++++++++++++++++++++++---- 3 files changed, 310 insertions(+), 35 deletions(-) diff --git a/package-lock.json b/package-lock.json index dccab5208..ba1dcb9bd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -53,6 +53,7 @@ "mustache": "^4.2.0", "node-persist": "^4.0.4", "nodemailer": "^6.10.0", + "pdf-lib": "^1.17.1", "phone": "^3.1.70", "query-string": "7.1.3", "recursive-diff": "^1.0.9", @@ -2698,6 +2699,24 @@ "@noble/hashes": "^1.1.5" } }, + "node_modules/@pdf-lib/standard-fonts": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/@pdf-lib/standard-fonts/-/standard-fonts-1.0.0.tgz", + "integrity": "sha512-hU30BK9IUN/su0Mn9VdlVKsWBS6GyhVfqjwl1FjZN4TxP6cCw0jP2w7V3Hf5uX7M0AZJ16vey9yE0ny7Sa59ZA==", + "license": "MIT", + "dependencies": { + "pako": "^1.0.6" + } + }, + "node_modules/@pdf-lib/upng": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@pdf-lib/upng/-/upng-1.0.1.tgz", + "integrity": "sha512-dQK2FUMQtowVP00mtIksrlZhdFXQZPC+taih1q4CvPZ5vqdxR/LKBaFg0oAfzd1GlHZXXSPdQfzQnt+ViGvEIQ==", + "license": "MIT", + "dependencies": { + "pako": "^1.0.10" + } + }, "node_modules/@pkgjs/parseargs": { "version": "0.11.0", "resolved": "https://registry.npmjs.org/@pkgjs/parseargs/-/parseargs-0.11.0.tgz", @@ -9312,6 +9331,12 @@ "integrity": "sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw==", "license": "BlueOak-1.0.0" }, + "node_modules/pako": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz", + "integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==", + "license": "(MIT AND Zlib)" + }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -9450,6 +9475,24 @@ "dev": true, "license": "MIT" }, + "node_modules/pdf-lib": { + "version": "1.17.1", + "resolved": "https://registry.npmjs.org/pdf-lib/-/pdf-lib-1.17.1.tgz", + "integrity": "sha512-V/mpyJAoTsN4cnP31vc0wfNA1+p20evqqnap0KLoRUN0Yk/p3wN52DOEsL4oBFcLdb76hlpKPtzJIgo67j/XLw==", + "license": "MIT", + "dependencies": { + "@pdf-lib/standard-fonts": "^1.0.0", + "@pdf-lib/upng": "^1.0.1", + "pako": "^1.0.11", + "tslib": "^1.11.1" + } + }, + "node_modules/pdf-lib/node_modules/tslib": { + "version": "1.14.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz", + "integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==", + "license": "0BSD" + }, "node_modules/phone": { "version": "3.1.70", "resolved": "https://registry.npmjs.org/phone/-/phone-3.1.70.tgz", diff --git a/package.json b/package.json index 1a9e5daec..e7106144a 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "mustache": "^4.2.0", "node-persist": "^4.0.4", "nodemailer": "^6.10.0", + "pdf-lib": "^1.17.1", "phone": "^3.1.70", "query-string": "7.1.3", "recursive-diff": "^1.0.9", diff --git a/server/ai/bill-ocr/bill-ocr.js b/server/ai/bill-ocr/bill-ocr.js index d75375373..b99d9c1fb 100644 --- a/server/ai/bill-ocr/bill-ocr.js +++ b/server/ai/bill-ocr/bill-ocr.js @@ -1,7 +1,8 @@ -const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand } = require("@aws-sdk/client-textract"); +const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand, AnalyzeExpenseCommand } = require("@aws-sdk/client-textract"); const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3"); const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs"); const { v4: uuidv4 } = require('uuid'); +const PDFDocument = require('pdf-lib').PDFDocument; // Initialize AWS clients const awsConfig = { @@ -18,6 +19,7 @@ const sqsClient = new SQSClient(awsConfig); let redisPubClient = null; const TEXTRACT_JOB_TTL = 3600; +const MIN_CONFIDENCE_VALUE = 50 /** * Initialize the bill-ocr module with Redis client @@ -87,10 +89,46 @@ async function jobExists(textractJobId) { return false; } +/** + * Check if there are any jobs in IN_PROGRESS status + * @returns {Promise} + */ +async function hasActiveJobs() { + if (!redisPubClient) { + throw new Error('Redis client not initialized.'); + } + + try { + // Get all textract job keys + const pattern = 'textract:job:*'; + const keys = await redisPubClient.keys(pattern); + + if (!keys || keys.length === 0) { + return false; + } + //TODO: Is there a better way to do this that supports clusters? + // Check if any job has IN_PROGRESS status + for (const key of keys) { + const data = await redisPubClient.get(key); + if (data) { + const jobData = JSON.parse(data); + if (jobData.status === 'IN_PROGRESS') { + return true; + } + } + } + + return false; + } catch (error) { + console.error('Error checking for active jobs:', error); + return false; + } +} + async function handleBillOcr(request, response) { // Check if file was uploaded if (!request.file) { - response.status(400).send({ error: 'No file uploaded' }); + response.status(400).send({ error: 'No file uploaded.' }); return; } @@ -98,15 +136,54 @@ async function handleBillOcr(request, response) { const uploadedFile = request.file; try { - // Start the Textract job (non-blocking) - const jobInfo = await startTextractJob(uploadedFile.buffer); + const fileType = getFileType(uploadedFile); + console.log(`Processing file type: ${fileType}`); - response.status(202).send({ - success: true, - jobId: jobInfo.jobId, - message: 'Invoice processing started', - statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}` - }); + // Images are always processed synchronously (single page) + if (fileType === 'image') { + console.log('Image => 1 page, processing synchronously'); + const result = await processSinglePageDocument(uploadedFile.buffer); + + response.status(200).send({ + success: true, + status: 'COMPLETED', + data: result, + message: 'Invoice processing completed' + }); + } else if (fileType === 'pdf') { + // Check the number of pages in the PDF + const pageCount = await getPdfPageCount(uploadedFile.buffer); + console.log(`PDF has ${pageCount} page(s)`); + + if (pageCount === 1) { + // Process synchronously for single-page documents + console.log('PDF => 1 page, processing synchronously'); + const result = await processSinglePageDocument(uploadedFile.buffer); + + response.status(200).send({ + success: true, + status: 'COMPLETED', + data: result, + message: 'Invoice processing completed' + }); + } else { + // Start the Textract job (non-blocking) for multi-page documents + console.log('PDF => 2+ pages, processing asynchronously'); + const jobInfo = await startTextractJob(uploadedFile.buffer); + + response.status(202).send({ + success: true, + jobId: jobInfo.jobId, + message: 'Invoice processing started', + statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}` + }); + } + } else { + response.status(400).send({ + error: 'Unsupported file type', + message: 'Please upload a PDF or supported image file (JPEG, PNG, TIFF)' + }); + } } catch (error) { console.error('Error starting invoice processing:', error); response.status(500).send({ @@ -160,6 +237,95 @@ async function handleBillOcrStatus(request, response) { } } +/** + * Detect file type based on MIME type and file signature + * @param {Object} file - Multer file object + * @returns {string} 'pdf', 'image', or 'unknown' + */ +function getFileType(file) { + // Check MIME type first + const mimeType = file.mimetype?.toLowerCase(); + + if (mimeType === 'application/pdf') { + return 'pdf'; + } + + if (mimeType && mimeType.startsWith('image/')) { + return 'image'; + } + + // Fallback: Check file signature (magic bytes) + const buffer = file.buffer; + if (buffer && buffer.length > 4) { + // PDF signature: %PDF + if (buffer[0] === 0x25 && buffer[1] === 0x50 && buffer[2] === 0x44 && buffer[3] === 0x46) { + return 'pdf'; + } + + // JPEG signature: FF D8 FF + if (buffer[0] === 0xFF && buffer[1] === 0xD8 && buffer[2] === 0xFF) { + return 'image'; + } + + // PNG signature: 89 50 4E 47 + if (buffer[0] === 0x89 && buffer[1] === 0x50 && buffer[2] === 0x4E && buffer[3] === 0x47) { + return 'image'; + } + + // HEIC/HEIF: Check for ftyp followed by heic/heix/hevc/hevx + if (buffer.length > 12) { + const ftypIndex = buffer.indexOf(Buffer.from('ftyp')); + if (ftypIndex > 0 && ftypIndex < 12) { + const brand = buffer.slice(ftypIndex + 4, ftypIndex + 8).toString('ascii'); + if (brand.startsWith('heic') || brand.startsWith('heix') || + brand.startsWith('hevc') || brand.startsWith('hevx') || + brand.startsWith('mif1')) { + return 'image'; + } + } + } + } + + return 'unknown'; +} + +/** + * Get the number of pages in a PDF buffer + * @param {Buffer} pdfBuffer + * @returns {Promise} + */ +async function getPdfPageCount(pdfBuffer) { + try { + const pdfDoc = await PDFDocument.load(pdfBuffer); + return pdfDoc.getPageCount(); + } catch (error) { + console.error('Error reading PDF page count:', error); + throw new Error('Failed to read PDF: ' + error.message); + } +} + +/** + * Process a single-page document synchronously using AnalyzeExpenseCommand + * @param {Buffer} pdfBuffer + * @returns {Promise} + */ +async function processSinglePageDocument(pdfBuffer) { + const analyzeCommand = new AnalyzeExpenseCommand({ + Document: { + Bytes: pdfBuffer + } + }); + + const result = await textractClient.send(analyzeCommand); + const invoiceData = extractInvoiceData(result); + const processedData = processScanData(invoiceData); + + return { + ...processedData, + originalTextractResponse: result + }; +} + async function startTextractJob(pdfBuffer) { // Upload PDF to S3 temporarily for Textract async processing const s3Bucket = process.env.AWS_AI_BUCKET; @@ -177,14 +343,14 @@ async function startTextractJob(pdfBuffer) { } const uploadId = uuidv4(); - const s3Key = `textract-temp/${uploadId}.pdf`; + const s3Key = `textract-temp/${uploadId}.pdf`; //TODO Update Keys structure to something better. // Upload to S3 const uploadCommand = new PutObjectCommand({ Bucket: s3Bucket, Key: s3Key, Body: pdfBuffer, - ContentType: 'application/pdf' + ContentType: 'application/pdf' //Hard coded - we only support PDFs for multi-page }); await s3Client.send(uploadCommand); @@ -228,6 +394,13 @@ async function processSQSMessages() { return; } + // Only poll if there are active mutli page jobs in progress + const hasActive = await hasActiveJobs(); + if (!hasActive) { + console.log('No active jobs in progress, skipping SQS poll'); + return; + } + try { console.log('Polling SQS queue:', queueUrl); const receiveCommand = new ReceiveMessageCommand({ @@ -267,7 +440,6 @@ async function handleTextractNotification(message) { const body = JSON.parse(message.Body); let snsMessage try { - snsMessage = JSON.parse(body.Message); } catch (error) { //Delete the message so it doesn't clog the queue @@ -296,13 +468,15 @@ async function handleTextractNotification(message) { if (status === 'SUCCEEDED') { // Retrieve the results - const invoiceData = await retrieveTextractResults(textractJobId); - const processedData = processScanData(invoiceData); + const { processedData, originalResponse } = await retrieveTextractResults(textractJobId); await setTextractJob(textractJobId, { ...jobInfo, status: 'COMPLETED', - data: processedData, + data: { + ...processedData, + originalTextractResponse: originalResponse + }, completedAt: new Date().toISOString() }); } else if (status === 'FAILED') { @@ -335,8 +509,16 @@ async function retrieveTextractResults(textractJobId) { nextToken = result.NextToken; } while (nextToken); + // Store the complete original response + const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments }; + // Extract invoice data from Textract response - return extractInvoiceData({ ExpenseDocuments: allExpenseDocuments }); + const invoiceData = extractInvoiceData(fullTextractResponse); + + return { + processedData: processScanData(invoiceData), + originalResponse: fullTextractResponse + }; } // Start SQS polling (call this when server starts) @@ -346,7 +528,6 @@ function startSQSPolling() { console.error('SQS polling error:', error); }); }, 10000); // Poll every 10 seconds - return pollInterval; } @@ -367,12 +548,15 @@ function extractInvoiceData(textractResponse) { expenseDoc.SummaryFields.forEach(field => { const fieldType = field.Type?.Text || ''; const fieldValue = field.ValueDetection?.Text || ''; + const fieldLabel = field.LabelDetection?.Text || ''; const confidence = field.ValueDetection?.Confidence || 0; // Map common invoice fields if (fieldType && fieldValue) { invoiceData.summary[fieldType] = { value: fieldValue, + label: fieldLabel, + normalizedLabel: normalizeLabelName(fieldLabel), confidence: confidence }; } @@ -390,6 +574,7 @@ function extractInvoiceData(textractResponse) { lineItem.LineItemExpenseFields.forEach(field => { const fieldType = field.Type?.Text || ''; const fieldValue = field.ValueDetection?.Text || ''; + const fieldLabel = field.LabelDetection?.Text || ''; const confidence = field.ValueDetection?.Confidence || 0; if (fieldType && fieldValue) { @@ -397,6 +582,8 @@ function extractInvoiceData(textractResponse) { const normalizedField = normalizeFieldName(fieldType); item[normalizedField] = { value: fieldValue, + label: fieldLabel, + normalizedLabel: normalizeLabelName(fieldLabel), confidence: confidence }; } @@ -417,16 +604,50 @@ function extractInvoiceData(textractResponse) { function normalizeFieldName(fieldType) { //Placeholder normalization for now. - const fieldMap = { - 'ITEM': 'description', - 'QUANTITY': 'quantity', - 'UNIT_PRICE': 'unitPrice', - 'PRICE': 'price', - 'PRODUCT_CODE': 'productCode', - 'EXPENSE_ROW': 'row' + return fieldType; +} + +function normalizeLabelName(labelText) { + if (!labelText) return ''; + + // Convert to lowercase and trim whitespace + let normalized = labelText.toLowerCase().trim(); + + // Remove special characters and replace spaces with underscores + normalized = normalized.replace(/[^a-z0-9\s]/g, '').replace(/\s+/g, '_'); + const standardizedFieldsnames = { + actual_cost: "actual_cost", + actual_price: "actual_price", + line_desc: "line_desc", + quantity: "quantity", + part_no: "part_no" + } + + // Common label normalizations + const labelMap = { + 'qty': standardizedFieldsnames.quantity, + 'qnty': standardizedFieldsnames.quantity, + 'sale_qty': standardizedFieldsnames.quantity, + 'quant': standardizedFieldsnames.quantity, + 'desc': standardizedFieldsnames.line_desc, + 'description': standardizedFieldsnames.line_desc, + 'item': standardizedFieldsnames.line_desc, + 'part': standardizedFieldsnames.part_no, + 'part_no': standardizedFieldsnames.part_no, + 'part_num': standardizedFieldsnames.part_no, + 'part_number': standardizedFieldsnames.part_no, + 'price': standardizedFieldsnames.actual_price, + 'unit_price': standardizedFieldsnames.actual_price, + 'amount': standardizedFieldsnames.actual_price, + 'list_price': standardizedFieldsnames.actual_price, + 'list': standardizedFieldsnames.actual_price, + 'retail_price': standardizedFieldsnames.actual_price, + 'net': standardizedFieldsnames.actual_cost, + 'selling_price': standardizedFieldsnames.actual_cost, + }; - return fieldMap[fieldType] || fieldType.toLowerCase().replace(/_/g, ''); + return labelMap[normalized] || normalized; // TODO: Should we monitor unmapped labels? } function processScanData(invoiceData) { @@ -438,8 +659,13 @@ function processScanData(invoiceData) { // Clean summary fields for (const [key, value] of Object.entries(invoiceData.summary)) { - if (value.confidence > 50) { // Only include fields with > 50% confidence - processed.summary[key] = value.value; + if (value.confidence > MIN_CONFIDENCE_VALUE) { // Only include fields with > 50% confidence + processed.summary[key] = { + value: value.value, + label: value.label, + normalizedLabel: value.normalizedLabel, + confidence: value.confidence + }; } } @@ -449,27 +675,32 @@ function processScanData(invoiceData) { const processedItem = {}; for (const [key, value] of Object.entries(item)) { - if (value.confidence > 50) { // Only include fields with > 50% confidence + if (value.confidence > MIN_CONFIDENCE_VALUE) { // Only include fields with > 50% confidence let cleanValue = value.value; // Parse numbers for quantity and price fields if (key === 'quantity') { cleanValue = parseFloat(cleanValue) || 0; - } else if (key === 'unitPrice' || key === 'price') { + } else if (key === 'retail_price' || key === 'actual_price') { // Remove currency symbols and parse cleanValue = parseFloat(cleanValue.replace(/[^0-9.-]/g, '')) || 0; } - processedItem[key] = cleanValue; + processedItem[key] = { + value: cleanValue, + label: value.label, + normalizedLabel: value.normalizedLabel, + confidence: value.confidence + }; } } return processedItem; }) - .filter(item => { - // Filter out items with no description or with quantity <= 0 - return item.description && (!item.quantity || item.quantity > 0); - }); + // .filter(item => { + // // Filter out items with no description or with quantity <= 0 + // return item.description && (!item.quantity || item.quantity > 0); + // }); return processed; }