From 2a6d0446f0e2941309fed35a291fa2cfd0798364 Mon Sep 17 00:00:00 2001 From: Patrick Fic Date: Mon, 26 Jan 2026 16:09:58 -0800 Subject: [PATCH] IO-3515 WIP - bulk calls functioning. Further refinement required. --- server.js | 5 +- server/ai/bill-ocr/bill-ocr.js | 148 +++++++++++++++++++++++++-------- 2 files changed, 117 insertions(+), 36 deletions(-) diff --git a/server.js b/server.js index b85b28e9c..6f920b51a 100644 --- a/server.js +++ b/server.js @@ -441,8 +441,11 @@ const main = async () => { await server.listen(port); logger.log(`Server started on port ${port}`, "INFO", "api"); + // Initialize bill-ocr with Redis client + const { initializeBillOcr, startSQSPolling } = require("./server/ai/bill-ocr/bill-ocr"); + initializeBillOcr(pubClient); + // Start SQS polling for Textract notifications - const { startSQSPolling } = require("./server/ai/bill-ocr/bill-ocr"); startSQSPolling(); logger.log(`Started SQS polling for Textract notifications`, "INFO", "api"); } catch (error) { diff --git a/server/ai/bill-ocr/bill-ocr.js b/server/ai/bill-ocr/bill-ocr.js index 6f283c3bb..d75375373 100644 --- a/server/ai/bill-ocr/bill-ocr.js +++ b/server/ai/bill-ocr/bill-ocr.js @@ -16,8 +16,76 @@ const textractClient = new TextractClient(awsConfig); const s3Client = new S3Client(awsConfig); const sqsClient = new SQSClient(awsConfig); -// In-memory job storage (consider using Redis or a database for production) -const jobStore = new Map(); +let redisPubClient = null; +const TEXTRACT_JOB_TTL = 3600; + +/** + * Initialize the bill-ocr module with Redis client + * @param {Object} pubClient - Redis cluster client + */ +function initializeBillOcr(pubClient) { + redisPubClient = pubClient; +} + +/** + * Generate Redis key for Textract job using textract job ID + * @param {string} textractJobId + * @returns {string} + */ +function getTextractJobKey(textractJobId) { + return `textract:job:${textractJobId}`; +} + +/** + * Store Textract job data in Redis + * @param {string} textractJobId + * @param {Object} jobData + */ +async function setTextractJob(textractJobId, jobData) { + if (!redisPubClient) { + throw new Error('Redis client not initialized. Call initializeBillOcr first.'); + } + const key = getTextractJobKey(textractJobId); + await redisPubClient.set(key, JSON.stringify(jobData)); + await redisPubClient.expire(key, TEXTRACT_JOB_TTL); +} + +/** + * Retrieve Textract job data from Redis + * @param {string} textractJobId + * @returns {Promise} + */ +async function getTextractJob(textractJobId) { + if (!redisPubClient) { + throw new Error('Redis client not initialized. Call initializeBillOcr first.'); + } + const key = getTextractJobKey(textractJobId); + const data = await redisPubClient.get(key); + return data ? JSON.parse(data) : null; +} + +/** + * Check if job exists by Textract job ID + * @param {string} textractJobId + * @returns {Promise} + */ +async function jobExists(textractJobId) { + if (!redisPubClient) { + throw new Error('Redis client not initialized. Call initializeBillOcr first.'); + } + + console.log('Checking if job exists for Textract job ID:', textractJobId); + const key = getTextractJobKey(textractJobId); + const exists = await redisPubClient.exists(key); + + if (exists) { + console.log(`Job found: ${textractJobId}`); + return true; + } + + console.log('No matching job found in Redis'); + return false; +} async function handleBillOcr(request, response) { // Check if file was uploaded @@ -37,7 +105,7 @@ async function handleBillOcr(request, response) { success: true, jobId: jobInfo.jobId, message: 'Invoice processing started', - statusUrl: `/api/bill-ocr/status/${jobInfo.jobId}` + statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}` }); } catch (error) { console.error('Error starting invoice processing:', error); @@ -53,16 +121,21 @@ async function handleBillOcrStatus(request, response) { console.log('request.params:', request.params); console.log('request.query:', request.query); - const { jobId } = request.params; - if (!jobId) { - console.log('No jobId found in params'); + + + const { jobId: textractJobId } = request.params; + + + + if (!textractJobId) { + console.log('No textractJobId found in params'); response.status(400).send({ error: 'Job ID is required' }); return; } - console.log('Looking for job:', jobId); - const jobStatus = jobStore.get(jobId); + console.log('Looking for job:', textractJobId); + const jobStatus = await getTextractJob(textractJobId); console.log('Job status:', jobStatus); if (!jobStatus) { @@ -103,8 +176,8 @@ async function startTextractJob(pdfBuffer) { throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required'); } - const jobId = uuidv4(); - const s3Key = `textract-temp/${jobId}.pdf`; + const uploadId = uuidv4(); + const s3Key = `textract-temp/${uploadId}.pdf`; // Upload to S3 const uploadCommand = new PutObjectCommand({ @@ -123,30 +196,26 @@ async function startTextractJob(pdfBuffer) { Name: s3Key } }, - OutputConfig: { - S3Bucket: s3Bucket, - S3Prefix: `textract-output/${jobId}/` - }, NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: snsRoleArn }, - ClientRequestToken: jobId + ClientRequestToken: uploadId }); const startResult = await textractClient.send(startCommand); + const textractJobId = startResult.JobId; - // Store job info - jobStore.set(jobId, { + // Store job info in Redis using textractJobId as the key + await setTextractJob(textractJobId, { status: 'IN_PROGRESS', - textractJobId: startResult.JobId, s3Key: s3Key, + uploadId: uploadId, startedAt: new Date().toISOString() }); return { - jobId: jobId, - textractJobId: startResult.JobId + jobId: textractJobId }; } @@ -196,40 +265,48 @@ async function processSQSMessages() { async function handleTextractNotification(message) { const body = JSON.parse(message.Body); - const snsMessage = JSON.parse(body.Message); + let snsMessage + try { + + snsMessage = JSON.parse(body.Message); + } catch (error) { + //Delete the message so it doesn't clog the queue + const deleteCommand = new DeleteMessageCommand({ + QueueUrl: process.env.AWS_TEXTRACT_SQS_QUEUE_URL, + ReceiptHandle: message.ReceiptHandle + }); + await sqsClient.send(deleteCommand); + console.error('Error parsing SNS message:', error); + console.log('Message Deleted:', body); + return; + } const textractJobId = snsMessage.JobId; const status = snsMessage.Status; - // Find our job by Textract job ID - let ourJobId = null; - for (const [key, value] of jobStore.entries()) { - if (value.textractJobId === textractJobId) { - ourJobId = key; - break; - } - } + // Check if job exists in Redis + const exists = await jobExists(textractJobId); - if (!ourJobId) { + if (!exists) { console.warn(`Job not found for Textract job ID: ${textractJobId}`); return; } - const jobInfo = jobStore.get(ourJobId); + const jobInfo = await getTextractJob(textractJobId); if (status === 'SUCCEEDED') { // Retrieve the results const invoiceData = await retrieveTextractResults(textractJobId); const processedData = processScanData(invoiceData); - jobStore.set(ourJobId, { + await setTextractJob(textractJobId, { ...jobInfo, status: 'COMPLETED', data: processedData, completedAt: new Date().toISOString() }); } else if (status === 'FAILED') { - jobStore.set(ourJobId, { + await setTextractJob(textractJobId, { ...jobInfo, status: 'FAILED', error: snsMessage.StatusMessage || 'Textract job failed', @@ -268,7 +345,7 @@ function startSQSPolling() { processSQSMessages().catch(error => { console.error('SQS polling error:', error); }); - }, 5000); // Poll every 5 seconds + }, 10000); // Poll every 10 seconds return pollInterval; } @@ -339,7 +416,7 @@ function extractInvoiceData(textractResponse) { } function normalizeFieldName(fieldType) { - // Convert Textract field types to more readable names + //Placeholder normalization for now. const fieldMap = { 'ITEM': 'description', 'QUANTITY': 'quantity', @@ -398,6 +475,7 @@ function processScanData(invoiceData) { } module.exports = { + initializeBillOcr, handleBillOcr, handleBillOcrStatus, startSQSPolling