Revert "Release/2026 02 27 (pull request #3070)"

This commit is contained in:
Patrick Fic
2026-03-04 16:18:44 +00:00
parent 522f2b9e26
commit c9e41ba72a
204 changed files with 5497 additions and 7715 deletions

View File

@@ -1,792 +0,0 @@
const Fuse = require('fuse.js');
const { standardizedFieldsnames } = require('./bill-ocr-normalize');
const InstanceManager = require("../../utils/instanceMgr").default;
const PRICE_PERCENT_MARGIN_TOLERANCE = 0.5; //Used to make sure prices and costs are likely.
const PRICE_QUANTITY_MARGIN_TOLERANCE = 0.03; //Used to make sure that if there is a quantity, the price is likely a unit price.
// Helper function to normalize fields
const normalizePartNumber = (str) => {
return str.replace(/[^a-zA-Z0-9]/g, '').toUpperCase();
};
const normalizeText = (str) => {
return str.replace(/[^a-zA-Z0-9\s]/g, '').replace(/\s+/g, ' ').trim().toUpperCase();
};
const normalizePrice = (str) => {
if (typeof str !== 'string') return str;
let value = str.trim();
// Handle European-style decimal comma like "292,37".
// Only treat the *last* comma as a decimal separator when:
// - there's no '.' anywhere (so we don't fight normal US formatting like "1,234.56")
// - and the suffix after the last comma is 1-2 digits (so "1,234" stays 1234)
if (!value.includes('.') && value.includes(',')) {
const lastCommaIndex = value.lastIndexOf(',');
const decimalSuffix = value.slice(lastCommaIndex + 1).trim();
if (/^\d{1,2}$/.test(decimalSuffix)) {
const before = value.slice(0, lastCommaIndex).replace(/,/g, '');
value = `${before}.${decimalSuffix}`;
} else {
// Treat commas as thousands separators (or noise) and drop them.
value = value.replace(/,/g, '');
}
}
return value.replace(/[^0-9.-]+/g, "");
};
const roundToIncrement = (value, increment) => {
if (typeof value !== 'number' || !isFinite(value) || typeof increment !== 'number' || !isFinite(increment) || increment <= 0) {
return value;
}
const rounded = Math.round((value + Number.EPSILON) / increment) * increment;
// Prevent float artifacts (e.g. 0.20500000000000002)
const decimals = Math.max(0, Math.ceil(-Math.log10(increment)));
return parseFloat(rounded.toFixed(decimals));
};
//More complex function. Not necessary at the moment, keeping for reference.
// const normalizePriceFinal = (str) => {
// if (typeof str !== 'string') {
// // If it's already a number, format to 2 decimals
// const num = parseFloat(str);
// return isNaN(num) ? 0 : num;
// }
// // First, try to extract valid decimal number patterns (e.g., "123.45")
// const decimalPattern = /\d+\.\d{1,2}/g;
// const decimalMatches = str.match(decimalPattern);
// if (decimalMatches && decimalMatches.length > 0) {
// // Found valid decimal number(s)
// const numbers = decimalMatches.map(m => parseFloat(m)).filter(n => !isNaN(n) && n > 0);
// if (numbers.length === 1) {
// return numbers[0];
// }
// if (numbers.length > 1) {
// // Check if all numbers are the same (e.g., "47.57.47.57" -> [47.57, 47.57])
// const uniqueNumbers = [...new Set(numbers)];
// if (uniqueNumbers.length === 1) {
// return uniqueNumbers[0];
// }
// // Check if numbers are very close (within 1% tolerance)
// const avg = numbers.reduce((a, b) => a + b, 0) / numbers.length;
// const allClose = numbers.every(num => Math.abs(num - avg) / avg < 0.01);
// if (allClose) {
// return avg;
// }
// // Return the first number (most likely correct)
// return numbers[0];
// }
// }
// // Fallback: Split on common delimiters and extract all potential numbers
// const parts = str.split(/[\/|\\,;]/).map(part => part.trim()).filter(part => part.length > 0);
// if (parts.length > 1) {
// // Multiple values detected - extract and parse all valid numbers
// const numbers = parts
// .map(part => {
// const cleaned = part.replace(/[^0-9.-]+/g, "");
// const parsed = parseFloat(cleaned);
// return isNaN(parsed) ? null : parsed;
// })
// .filter(num => num !== null && num > 0);
// if (numbers.length === 0) {
// // No valid numbers found, try fallback to basic cleaning
// const cleaned = str.replace(/[^0-9.-]+/g, "");
// const parsed = parseFloat(cleaned);
// return isNaN(parsed) ? 0 : parsed;
// }
// if (numbers.length === 1) {
// return numbers[0];
// }
// // Multiple valid numbers
// const uniqueNumbers = [...new Set(numbers)];
// if (uniqueNumbers.length === 1) {
// return uniqueNumbers[0];
// }
// // Check if numbers are very close (within 1% tolerance)
// const avg = numbers.reduce((a, b) => a + b, 0) / numbers.length;
// const allClose = numbers.every(num => Math.abs(num - avg) / avg < 0.01);
// if (allClose) {
// return avg;
// }
// // Return the first valid number
// return numbers[0];
// }
// // Single value or no delimiters, clean normally
// const cleaned = str.replace(/[^0-9.-]+/g, "");
// const parsed = parseFloat(cleaned);
// return isNaN(parsed) ? 0 : parsed;
// };
// Helper function to calculate Textract OCR confidence (0-100%)
const calculateTextractConfidence = (textractLineItem) => {
if (!textractLineItem || Object.keys(textractLineItem).length === 0) {
return 0;
}
const confidenceValues = [];
// Collect confidence from all fields in the line item
Object.values(textractLineItem).forEach(field => {
if (field.confidence && typeof field.confidence === 'number') {
confidenceValues.push(field.confidence);
}
});
if (confidenceValues.length === 0) {
return 0;
}
// Check if critical normalized labels are present
const hasActualCost = Object.values(textractLineItem).some(field => field.normalizedLabel === standardizedFieldsnames.actual_cost);
const hasActualPrice = Object.values(textractLineItem).some(field => field.normalizedLabel === standardizedFieldsnames.actual_price);
const hasLineDesc = Object.values(textractLineItem).some(field => field.normalizedLabel === standardizedFieldsnames.line_desc);
const hasQuantity = textractLineItem?.QUANTITY?.value; //We don't normalize quantity, we just use what textract gives us.
// Calculate weighted average, giving more weight to important fields
// If we can identify key fields (ITEM, PRODUCT_CODE, PRICE), weight them higher
let totalWeight = 0;
let weightedSum = 0;
Object.entries(textractLineItem).forEach(([key, field]) => {
if (field.confidence && typeof field.confidence === 'number') {
// Weight important fields higher
let weight = 1;
if (field.normalizedLabel === standardizedFieldsnames.actual_cost || field.normalizedLabel === standardizedFieldsnames.actual_price) {
weight = 4;
}
else if (field.normalizedLabel === standardizedFieldsnames.part_no || field.normalizedLabel === standardizedFieldsnames.line_desc) {
weight = 3.5;
}
else if (field.normalizedLabel === standardizedFieldsnames.quantity) {
weight = 3.5;
}
// We generally ignore the key from textract. Keeping for future reference.
// else if (key === 'ITEM' || key === 'PRODUCT_CODE') {
// weight = 3; // Description and part number are most important
// } else if (key === 'PRICE' || key === 'UNIT_PRICE' || key === 'QUANTITY') {
// weight = 2; // Price and quantity moderately important
// }
weightedSum += field.confidence * weight;
totalWeight += weight;
}
});
let avgConfidence = totalWeight > 0 ? weightedSum / totalWeight : 0;
// Apply penalty if critical normalized labels are missing
let missingFieldsPenalty = 1.0;
let missingCount = 0;
if (!hasActualCost) missingCount++;
if (!hasActualPrice) missingCount++;
if (!hasLineDesc) missingCount++;
if (!hasQuantity) missingCount++;
// Each missing field reduces confidence by 20%
if (missingCount > 0) {
missingFieldsPenalty = 1.0 - (missingCount * 0.20);
}
avgConfidence = avgConfidence * missingFieldsPenalty;
return Math.round(avgConfidence * 100) / 100; // Round to 2 decimal places
};
const calculateMatchConfidence = (matches, bestMatch) => {
if (!matches || matches.length === 0 || !bestMatch) {
return 0; // No match = 0% confidence
}
// Base confidence from the match score
// finalScore is already weighted and higher is better
// Normalize it to a 0-100 scale
const baseScore = Math.min(bestMatch.finalScore * 10, 100); // Scale factor of 10, cap at 100
// Bonus for multiple field matches (up to +15%)
const fieldMatchBonus = Math.min(bestMatch.fieldMatches.length * 5, 15);
// Bonus for having price data (+10%)
const priceDataBonus = bestMatch.hasPriceData ? 10 : 0;
// Bonus for clear winner (gap between 1st and 2nd match)
let confidenceMarginBonus = 0;
if (matches.length > 1) {
const scoreDiff = bestMatch.finalScore - matches[1].finalScore;
// If the best match is significantly better than the second best, add bonus
confidenceMarginBonus = Math.min(scoreDiff * 5, 10); // Up to +10%
} else {
// Only one match found, add small bonus
confidenceMarginBonus = 5;
}
// Calculate total match confidence
let matchConfidence = baseScore + fieldMatchBonus + priceDataBonus + confidenceMarginBonus;
// Cap at 100% and round to 2 decimal places
matchConfidence = Math.min(Math.round(matchConfidence * 100) / 100, 100);
// Ensure minimum of 1% if there's any match at all
return Math.max(matchConfidence, 1);
};
const calculateOverallConfidence = (ocrConfidence, matchConfidence) => {
// If there's no match, OCR confidence doesn't matter much
if (matchConfidence === 0) {
return 0;
}
// Overall confidence is affected by both how well Textract read the data
// and how well we matched it to existing joblines
// Use a weighted average: 60% OCR confidence, 40% match confidence
// OCR confidence is more important because even perfect match is useless without good OCR
const overall = (ocrConfidence * 0.6) + (matchConfidence * 0.4);
return Math.round(overall * 100) / 100;
};
// Helper function to merge and deduplicate results with weighted scoring
const mergeResults = (resultsArray, weights = []) => {
const scoreMap = new Map();
resultsArray.forEach((results, index) => {
const weight = weights[index] || 1;
results.forEach(result => {
const id = result.item.id;
const weightedScore = result.score * weight;
if (!scoreMap.has(id)) {
scoreMap.set(id, { item: result.item, score: weightedScore, count: 1 });
} else {
const existing = scoreMap.get(id);
// Lower score is better in Fuse.js, so take the minimum
existing.score = Math.min(existing.score, weightedScore);
existing.count++;
}
});
});
// Convert back to array and sort by score (lower is better)
return Array.from(scoreMap.values())
.sort((a, b) => {
// Prioritize items found in multiple searches
if (a.count !== b.count) return b.count - a.count;
return a.score - b.score;
})
.slice(0, 5); // Return top 5 results
};
async function generateBillFormData({ processedData, jobid: jobidFromProps, bodyshopid, partsorderid, req }) {
const client = req.userGraphQLClient;
let jobid = jobidFromProps;
//If no jobid, fetch it, and funnel it back.
if (!jobid || jobid === null || jobid === undefined || jobid === "" || jobid === "null" || jobid === "undefined") {
const ro_number = processedData.summary?.PO_NUMBER?.value || Object.values(processedData.summary).find(value => value.normalizedLabel === 'ro_number')?.value;
if (!ro_number) {
throw new Error("Could not find RO number in the extracted data to associate with the bill. Select an RO and try again.");
}
const { jobs } = await client.request(`
query QUERY_BILL_OCR_JOB_BY_RO($ro_number: String!) {
jobs(where: {ro_number: {_eq: $ro_number}}) {
id
}
}`, { ro_number });
if (jobs.length === 0) {
throw new Error("No job found for the detected RO/PO number.");
}
jobid = jobs[0].id;
}
const jobData = await client.request(`
query QUERY_BILL_OCR_DATA($jobid: uuid!) {
vendors {
id
name
}
jobs_by_pk(id: $jobid) {
id
bodyshop {
id
md_responsibility_centers
cdk_dealerid
pbs_serialnumber
rr_dealerid
}
joblines {
id
line_desc
removed
act_price
db_price
oem_partno
alt_partno
part_type
}
}
}
`, {
jobid, // TODO: Parts order IDs are currently ignore. If receving a parts order, it could be used to more precisely match to joblines.
});
//Create fuses of line descriptions for matching.
const jobLineDescFuse = new Fuse(
jobData.jobs_by_pk.joblines.map(jl => ({ ...jl, line_desc_normalized: normalizeText(jl.line_desc || ""), oem_partno_normalized: normalizePartNumber(jl.oem_partno || ""), alt_partno_normalized: normalizePartNumber(jl.alt_partno || "") })),
{
keys: [{
name: 'line_desc',
weight: 6
}, {
name: 'oem_partno',
weight: 8
}, {
name: 'alt_partno',
weight: 5
},
{
name: 'act_price',
weight: 1
},
{
name: 'line_desc_normalized',
weight: 4
},
{
name: 'oem_partno_normalized',
weight: 6
},
{
name: 'alt_partno_normalized',
weight: 3
}],
threshold: 0.4, //Adjust as needed for matching sensitivity,
includeScore: true,
}
);
const joblineMatches = joblineFuzzySearch({ fuseToSearch: jobLineDescFuse, processedData });
const vendorFuse = new Fuse(
jobData.vendors.map(v => ({ ...v, name_normalized: normalizeText(v.name) })),
{
keys: [{ name: "name", weight: 3 }, { name: 'name_normalized', weight: 2 }],
threshold: 0.4,
includeScore: true,
},
);
const vendorMatches = vendorFuse.search(normalizeText(processedData.summary?.VENDOR_NAME?.value || processedData.summary?.NAME?.value));
let vendorid;
if (vendorMatches.length > 0) {
vendorid = vendorMatches[0].item.id;
}
const { jobs_by_pk: job } = jobData;
if (!job) {
throw new Error('Job not found for bill form data generation.');
}
//Is there a subtotal level discount? If there is, we need to figure out what the percentage is, and apply that to the actual cost as a reduction
const subtotalDiscountValueRaw = processedData.summary?.DISCOUNT?.value || processedData.summary?.SUBTOTAL_DISCOUNT?.value || 0;
let discountPercentageDecimal = 0;
if (subtotalDiscountValueRaw) {
const subtotal = parseFloat(normalizePrice(processedData.summary?.SUBTOTAL?.value || 0)) || 0;
const subtotalDiscountValue = parseFloat(normalizePrice(subtotalDiscountValueRaw)) || 0;
if (subtotal > 0 && subtotalDiscountValue) {
// Store discount percentage as a decimal (e.g. 20.5% => 0.205),
// but only allow half-percent increments (0.005 steps).
discountPercentageDecimal = Math.abs(subtotalDiscountValue / subtotal);
discountPercentageDecimal = roundToIncrement(discountPercentageDecimal, 0.005);
}
}
//TODO: How do we handle freight lines and core charges?
//Create the form data structure for the bill posting screen.
const billFormData = {
"jobid": jobid,
"vendorid": vendorid,
"invoice_number": processedData.summary?.INVOICE_RECEIPT_ID?.value,
"date": processedData.summary?.INVOICE_RECEIPT_DATE?.value,
"is_credit_memo": false,
"total": normalizePrice(processedData.summary?.INVOICE_TOTAL?.value || processedData.summary?.TOTAL?.value),
"billlines": joblineMatches.map(jlMatchLine => {
const { matches, textractLineItem, } = jlMatchLine
//Matches should be pre-sorted, take the first one.
const matchToUse = matches.length > 0 ? matches[0] : null;
// Calculate confidence scores
const ocrConfidence = calculateTextractConfidence(textractLineItem);
const matchConfidence = calculateMatchConfidence(matches, matchToUse);
const overallConfidence = calculateOverallConfidence(ocrConfidence, matchConfidence);
//TODO: Should be using the textract if there is an exact match on the normalized label.
//if there isn't then we can do the below.
let actualPrice, actualCost;
//TODO: What is several match on the normalized name? We need to pick the most likely one.
const hasNormalizedActualPrice = Object.keys(textractLineItem).find(key => textractLineItem[key].normalizedLabel === 'actual_price');
const hasNormalizedActualCost = Object.keys(textractLineItem).find(key => textractLineItem[key].normalizedLabel === 'actual_cost');
if (hasNormalizedActualPrice) {
actualPrice = textractLineItem[hasNormalizedActualPrice].value;
}
if (hasNormalizedActualCost) {
actualCost = textractLineItem[hasNormalizedActualCost].value;
}
if (!hasNormalizedActualPrice || !hasNormalizedActualCost) {
//This is if there was no match found for normalized labels.
//Check all prices, and generally the higher one will be the actual price and the lower one will be the cost.
//Need to make sure that other random items are excluded. This should be within a reasonable range of the matched jobline at matchToUse.item.act_price
//Iterate over all of the text values, and check out which of them are currencies.
//They'll be in the format starting with a $ sign usually.
const currencyTextractLineItems = [] // {key, value}
Object.keys(textractLineItem).forEach(key => {
const currencyValue = textractLineItem[key].value?.startsWith('$') ? textractLineItem[key].value : null;
if (currencyValue) {
//Clean it and parse it
const cleanValue = parseFloat(currencyValue.replace(/[^0-9.-]/g, '')) || 0;
currencyTextractLineItems.push({ key, value: cleanValue })
}
})
//Sort them descending
currencyTextractLineItems.sort((a, b) => b.value - a.value);
//Most expensive should be the actual price, second most expensive should be the cost.
if (!actualPrice) actualPrice = currencyTextractLineItems.length > 0 ? currencyTextractLineItems[0].value : 0;
if (!actualCost) actualCost = currencyTextractLineItems.length > 1 ? currencyTextractLineItems[1].value : 0;
if (matchToUse) {
//Double check that they're within 50% of the matched jobline price if there is one.
const joblinePrice = parseFloat(matchToUse.item.act_price) || 0;
if (!hasNormalizedActualPrice && actualPrice > 0 && (actualPrice < joblinePrice * (1 - PRICE_PERCENT_MARGIN_TOLERANCE) || actualPrice > joblinePrice * (1 + PRICE_PERCENT_MARGIN_TOLERANCE))) {
actualPrice = joblinePrice; //Set to the jobline as a fallback.
}
if (!hasNormalizedActualCost && actualCost > 0 && (actualCost < joblinePrice * (1 - PRICE_PERCENT_MARGIN_TOLERANCE) || actualCost > joblinePrice * (1 + PRICE_PERCENT_MARGIN_TOLERANCE))) {
actualCost = null //Blank it out if it's not likely.
}
}
}
//If there's nothing, just fall back to seeing if there's a price object from textract.
if (!actualPrice && textractLineItem.PRICE) {
actualPrice = textractLineItem.PRICE.value;
}
if (!actualCost && textractLineItem.PRICE) {
actualCost = textractLineItem.PRICE.value;
}
//If quantity greater than 1, check if the actual cost is a multiple of the actual price, if so, divide it out to get the unit price.
const quantity = parseInt(textractLineItem?.QUANTITY?.value);
if (quantity && quantity > 1) {
if (actualPrice && quantity && Math.abs((actualPrice / quantity) - (parseFloat(matchToUse?.item?.act_price) || 0)) / ((parseFloat(matchToUse?.item?.act_price) || 1)) < PRICE_QUANTITY_MARGIN_TOLERANCE) {
actualPrice = actualPrice / quantity;
}
if (actualCost && quantity && Math.abs((actualCost / quantity) - (parseFloat(matchToUse?.item?.act_price) || 0)) / ((parseFloat(matchToUse?.item?.act_price) || 1)) < PRICE_QUANTITY_MARGIN_TOLERANCE) {
actualCost = actualCost / quantity;
}
}
if (discountPercentageDecimal > 0) {
actualCost = actualCost * (1 - discountPercentageDecimal);
}
const responsibilityCenters = job.bodyshop.md_responsibility_centers
//TODO: Do we need to verify the lines to see if it is a unit price or total price (i.e. quantity * price)
const lineObject = {
"line_desc": matchToUse?.item?.line_desc || textractLineItem.ITEM?.value || "NO DESCRIPTION",
"quantity": textractLineItem.QUANTITY?.value,
"actual_price": normalizePrice(actualPrice),
"actual_cost": normalizePrice(actualCost),
"cost_center": matchToUse?.item?.part_type
? bodyshopHasDmsKey(job.bodyshop)
? matchToUse?.item?.part_type !== "PAE"
? matchToUse?.item?.part_type
: null
: responsibilityCenters.defaults &&
(responsibilityCenters.defaults.costs[matchToUse?.item?.part_type] || null)
: null,
"applicable_taxes": {
"federal": InstanceManager({ imex: true, rome: false }),
"state": false,
"local": false
},
"joblineid": matchToUse?.item?.id || "noline",
"confidence": `T${overallConfidence} - O${ocrConfidence} - J${matchConfidence}`
}
return lineObject
})
}
return billFormData
}
function joblineFuzzySearch({ fuseToSearch, processedData }) {
const matches = []
const searchStats = []; // Track search statistics
processedData.lineItems.forEach((lineItem, lineIndex) => {
const lineStats = {
lineNumber: lineIndex + 1,
searches: []
};
// Refined ITEM search (multi-word description)
const refinedItemResults = (() => {
if (!lineItem.ITEM?.value) return [];
const itemValue = lineItem.ITEM.value;
const normalized = normalizeText(itemValue);
// 1: Full string search
const fullSearch = fuseToSearch.search(normalized);
lineStats.searches.push({ type: 'ITEM - Full String', term: normalized, results: fullSearch.length });
// 2: Search individual significant words (3+ chars)
const words = normalized.split(' ').filter(w => w.length >= 3);
const wordSearches = words.map(word => {
const results = fuseToSearch.search(word);
lineStats.searches.push({ type: 'ITEM - Individual Word', term: word, results: results.length });
return results;
});
// 3: Search without spaces entirely
const noSpaceSearch = fuseToSearch.search(normalized.replace(/\s+/g, ''));
lineStats.searches.push({ type: 'ITEM - No Spaces', term: normalized.replace(/\s+/g, ''), results: noSpaceSearch.length });
// Merge results with weights (full search weighted higher)
return mergeResults(
[fullSearch, ...wordSearches, noSpaceSearch],
[1.0, ...words.map(() => 1.5), 1.2] // Full search best, individual words penalized slightly
);
})();
// Refined PRODUCT_CODE search (part numbers)
const refinedProductCodeResults = (() => {
if (!lineItem.PRODUCT_CODE?.value) return [];
const productCode = lineItem.PRODUCT_CODE.value;
const normalized = normalizePartNumber(productCode);
// 1: Normalized search (no spaces/special chars)
const normalizedSearch = fuseToSearch.search(normalized);
lineStats.searches.push({ type: 'PRODUCT_CODE - Normalized', term: normalized, results: normalizedSearch.length });
// 2: Original with minimal cleaning
const minimalClean = productCode.replace(/\s+/g, '').toUpperCase();
const minimalSearch = fuseToSearch.search(minimalClean);
lineStats.searches.push({ type: 'PRODUCT_CODE - Minimal Clean', term: minimalClean, results: minimalSearch.length });
// 3: Search with dashes (common in part numbers)
const withDashes = productCode.replace(/[^a-zA-Z0-9-]/g, '').toUpperCase();
const dashSearch = fuseToSearch.search(withDashes);
lineStats.searches.push({ type: 'PRODUCT_CODE - With Dashes', term: withDashes, results: dashSearch.length });
// 4: Special chars to spaces (preserve word boundaries)
const specialCharsToSpaces = productCode.replace(/[^a-zA-Z0-9\s]/g, ' ').replace(/\s+/g, ' ').trim().toUpperCase();
const specialCharsSearch = fuseToSearch.search(specialCharsToSpaces);
lineStats.searches.push({ type: 'PRODUCT_CODE - Special Chars to Spaces', term: specialCharsToSpaces, results: specialCharsSearch.length });
return mergeResults(
[normalizedSearch, minimalSearch, dashSearch, specialCharsSearch],
[1.0, 1.1, 1.2, 1.15] // Prefer fully normalized, special chars to spaces slightly weighted
);
})();
// Refined PRICE search
const refinedPriceResults = (() => {
if (!lineItem.PRICE?.value) return [];
const price = normalizePrice(lineItem.PRICE.value);
// 1: Exact price match
const exactSearch = fuseToSearch.search(price);
lineStats.searches.push({ type: 'PRICE - Exact', term: price, results: exactSearch.length });
// 2: Price with 2 decimal places
const priceFloat = parseFloat(price);
if (!isNaN(priceFloat)) {
const formattedPrice = priceFloat.toFixed(2);
const formattedSearch = fuseToSearch.search(formattedPrice);
lineStats.searches.push({ type: 'PRICE - Formatted (2 decimals)', term: formattedPrice, results: formattedSearch.length });
return mergeResults([exactSearch, formattedSearch], [1.0, 1.1]);
}
return exactSearch;
})();
// Refined UNIT_PRICE search
const refinedUnitPriceResults = (() => {
if (!lineItem.UNIT_PRICE?.value) return [];
const unitPrice = normalizePrice(lineItem.UNIT_PRICE.value);
// 1: Exact price match
const exactSearch = fuseToSearch.search(unitPrice);
lineStats.searches.push({ type: 'UNIT_PRICE - Exact', term: unitPrice, results: exactSearch.length });
// 2: Price with 2 decimal places
const priceFloat = parseFloat(unitPrice);
if (!isNaN(priceFloat)) {
const formattedPrice = priceFloat.toFixed(2);
const formattedSearch = fuseToSearch.search(formattedPrice);
lineStats.searches.push({ type: 'UNIT_PRICE - Formatted (2 decimals)', term: formattedPrice, results: formattedSearch.length });
return mergeResults([exactSearch, formattedSearch], [1.0, 1.1]);
}
return exactSearch;
})();
//Merge them all together and sort by the highest scores.
const combinedScoreMap = new Map();
// Weight different field types differently
const fieldWeights = {
productCode: 5.0, // Most important - part numbers should match
item: 3.0, // Second most important - description
price: 1.0, // Less important - prices can vary
unitPrice: 0.8 // Least important - similar to price
};
[
{ results: refinedProductCodeResults, weight: fieldWeights.productCode, field: 'productCode' },
{ results: refinedItemResults, weight: fieldWeights.item, field: 'item' },
{ results: refinedPriceResults, weight: fieldWeights.price, field: 'price' },
{ results: refinedUnitPriceResults, weight: fieldWeights.unitPrice, field: 'unitPrice' }
].forEach(({ results, weight, field }) => {
results.forEach((result, index) => {
const id = result.item.id;
// Position bonus (first result is better than fifth)
const positionBonus = (5 - index) / 5;
// Lower score is better in Fuse.js, so invert it and apply weights
const normalizedScore = (1 - result.score) * weight * positionBonus;
if (!combinedScoreMap.has(id)) {
combinedScoreMap.set(id, {
item: result.item,
score: normalizedScore,
fieldMatches: [field],
matchCount: result.count || 1
});
} else {
const existing = combinedScoreMap.get(id);
existing.score += normalizedScore;
existing.fieldMatches.push(field);
existing.matchCount += (result.count || 1);
}
});
});
// Convert to array and sort by best combined score
const finalMatches = Array.from(combinedScoreMap.values())
.map(entry => {
// Apply penalty if item has no act_price or it's 0
const hasPriceData = entry.item.act_price && parseFloat(entry.item.act_price) > 0;
const priceDataPenalty = hasPriceData ? 1.0 : 0.5; // 50% penalty if no price
return {
...entry,
// Boost score for items that matched in multiple fields, penalize for missing price
finalScore: entry.score * (1 + (entry.fieldMatches.length * 0.2)) * priceDataPenalty,
hasPriceData
};
})
.sort((a, b) => b.finalScore - a.finalScore)
.slice(0, 5);
// Always push the textract line item, even if no matches found
// This ensures all invoice lines are processed
matches.push({
matches: finalMatches,
textractLineItem: lineItem,
hasMatch: finalMatches.length > 0
});
searchStats.push(lineStats);
})
// // Output search statistics table
// console.log('\n═══════════════════════════════════════════════════════════════════════');
// console.log(' FUSE.JS SEARCH STATISTICS');
// console.log('═══════════════════════════════════════════════════════════════════════\n');
// searchStats.forEach(lineStat => {
// console.log(`📄 Line Item #${lineStat.lineNumber}:`);
// console.log('─'.repeat(75));
// if (lineStat.searches.length > 0) {
// const tableData = lineStat.searches.map(search => ({
// 'Search Type': search.type,
// 'Search Term': search.term.substring(0, 40) + (search.term.length > 40 ? '...' : ''),
// 'Results': search.results
// }));
// console.table(tableData);
// } else {
// console.log(' No searches performed for this line item.\n');
// }
// });
// // Summary statistics
// const totalSearches = searchStats.reduce((sum, stat) => sum + stat.searches.length, 0);
// const totalResults = searchStats.reduce((sum, stat) =>
// sum + stat.searches.reduce((s, search) => s + search.results, 0), 0);
// const avgResultsPerSearch = totalSearches > 0 ? (totalResults / totalSearches).toFixed(2) : 0;
// console.log('═══════════════════════════════════════════════════════════════════════');
// console.log(' SUMMARY');
// console.log('═══════════════════════════════════════════════════════════════════════');
// console.table({
// 'Total Line Items': processedData.lineItems.length,
// 'Total Searches Performed': totalSearches,
// 'Total Results Found': totalResults,
// 'Average Results per Search': avgResultsPerSearch
// });
// console.log('═══════════════════════════════════════════════════════════════════════\n');
return matches
}
const bodyshopHasDmsKey = (bodyshop) =>
bodyshop.cdk_dealerid || bodyshop.pbs_serialnumber || bodyshop.rr_dealerid;
module.exports = {
generateBillFormData,
normalizePrice
}

View File

@@ -1,159 +0,0 @@
const PDFDocument = require('pdf-lib').PDFDocument;
const logger = require("../../utils/logger");
const TEXTRACT_REDIS_PREFIX = `textract:${process.env?.NODE_ENV}`
const TEXTRACT_JOB_TTL = 10 * 60;
/**
* Generate Redis key for Textract job using textract job ID
* @param {string} textractJobId
* @returns {string}
*/
function getTextractJobKey(textractJobId) {
return `${TEXTRACT_REDIS_PREFIX}:${textractJobId}`;
}
/**
* Store Textract job data in Redis
* @param {string} textractJobId
* @param {Object} redisPubClient
* @param {Object} jobData
*/
async function setTextractJob({ redisPubClient, textractJobId, jobData }) {
if (!redisPubClient) {
throw new Error('Redis client not initialized. Call initializeBillOcr first.');
}
const key = getTextractJobKey(textractJobId);
await redisPubClient.set(key, JSON.stringify(jobData));
await redisPubClient.expire(key, TEXTRACT_JOB_TTL);
}
/**
* Retrieve Textract job data from Redis
* @param {string} textractJobId
* @param {Object} redisPubClient
* @returns {Promise<Object|null>}
*/
async function getTextractJob({ redisPubClient, textractJobId }) {
if (!redisPubClient) {
throw new Error('Redis client not initialized. Call initializeBillOcr first.');
}
const key = getTextractJobKey(textractJobId);
const data = await redisPubClient.get(key);
return data ? JSON.parse(data) : null;
}
/**
* Detect file type based on MIME type and file signature
* @param {Object} file - Multer file object
* @returns {string} 'pdf', 'image', or 'unknown'
*/
function getFileType(file) {
// Check MIME type first
const mimeType = file.mimetype?.toLowerCase();
if (mimeType === 'application/pdf') {
return 'pdf';
}
if (mimeType && mimeType.startsWith('image/')) {
return 'image';
}
// Fallback: Check file signature (magic bytes)
const buffer = file.buffer;
if (buffer && buffer.length > 4) {
// PDF signature: %PDF
if (buffer[0] === 0x25 && buffer[1] === 0x50 && buffer[2] === 0x44 && buffer[3] === 0x46) {
return 'pdf';
}
// JPEG signature: FF D8 FF
if (buffer[0] === 0xFF && buffer[1] === 0xD8 && buffer[2] === 0xFF) {
return 'image';
}
// PNG signature: 89 50 4E 47
if (buffer[0] === 0x89 && buffer[1] === 0x50 && buffer[2] === 0x4E && buffer[3] === 0x47) {
return 'image';
}
// HEIC/HEIF: Check for ftyp followed by heic/heix/hevc/hevx
if (buffer.length > 12) {
const ftypIndex = buffer.indexOf(Buffer.from('ftyp'));
if (ftypIndex > 0 && ftypIndex < 12) {
const brand = buffer.slice(ftypIndex + 4, ftypIndex + 8).toString('ascii');
if (brand.startsWith('heic') || brand.startsWith('heix') ||
brand.startsWith('hevc') || brand.startsWith('hevx') ||
brand.startsWith('mif1')) {
return 'image';
}
}
}
}
return 'unknown';
}
/**
* Get the number of pages in a PDF buffer
* @param {Buffer} pdfBuffer
* @returns {Promise<number>}
*/
async function getPdfPageCount(pdfBuffer) {
try {
const pdfDoc = await PDFDocument.load(pdfBuffer);
return pdfDoc.getPageCount();
} catch (error) {
console.error('Error reading PDF page count:', error);
throw new Error('Failed to read PDF: ' + error.message);
}
}
/**
* Check if there are any jobs in IN_PROGRESS status
* @returns {Promise<boolean>}
*/
async function hasActiveJobs({ redisPubClient }) {
if (!redisPubClient) {
throw new Error('Redis client not initialized.');
}
try {
// Get all textract job keys
const pattern = `${TEXTRACT_REDIS_PREFIX}:*`;
const keys = await redisPubClient.keys(pattern);
if (!keys || keys.length === 0) {
return false;
}
//TODO: Is there a better way to do this that supports clusters?
// Check if any job has IN_PROGRESS status
for (const key of keys) {
const data = await redisPubClient.get(key);
if (data) {
const jobData = JSON.parse(data);
if (jobData.status === 'IN_PROGRESS') {
return true;
}
}
}
return false;
} catch (error) {
logger.log("bill-ocr-job-check-error", "ERROR", "api", null, { error: error.message, stack: error.stack });
return false;
}
}
module.exports = {
getTextractJobKey,
setTextractJob,
getTextractJob,
getFileType,
getPdfPageCount,
hasActiveJobs,
TEXTRACT_REDIS_PREFIX
}

View File

@@ -1,202 +0,0 @@
const MIN_CONFIDENCE_VALUE = 50
function normalizeFieldName(fieldType) {
//Placeholder normalization for now.
return fieldType;
}
const standardizedFieldsnames = {
actual_cost: "actual_cost",
actual_price: "actual_price",
line_desc: "line_desc",
quantity: "quantity",
part_no: "part_no",
ro_number: "ro_number",
}
function normalizeLabelName(labelText) {
if (!labelText) return '';
// Convert to lowercase and trim whitespace
let normalized = labelText.toLowerCase().trim();
// Remove special characters and replace spaces with underscores
normalized = normalized.replace(/[^a-z0-9\s]/g, '').replace(/\s+/g, '_');
// Common label normalizations
const labelMap = {
'qty': standardizedFieldsnames.quantity,
'qnty': standardizedFieldsnames.quantity,
'sale_qty': standardizedFieldsnames.quantity,
'invoiced_qty': standardizedFieldsnames.quantity,
'qty_shipped': standardizedFieldsnames.quantity,
'quantity': standardizedFieldsnames.quantity,
'filled': standardizedFieldsnames.quantity,
'count': standardizedFieldsnames.quantity,
'quant': standardizedFieldsnames.quantity,
'desc': standardizedFieldsnames.line_desc,
'description': standardizedFieldsnames.line_desc,
'item': standardizedFieldsnames.line_desc,
'part': standardizedFieldsnames.part_no,
'part_no': standardizedFieldsnames.part_no,
'part_num': standardizedFieldsnames.part_no,
'part_number': standardizedFieldsnames.part_no,
'item_no': standardizedFieldsnames.part_no,
'price': standardizedFieldsnames.actual_price,
//'amount': standardizedFieldsnames.actual_price,
'list_price': standardizedFieldsnames.actual_price,
'unit_price': standardizedFieldsnames.actual_price,
'list': standardizedFieldsnames.actual_price,
'retail_price': standardizedFieldsnames.actual_price,
'retail': standardizedFieldsnames.actual_price,
'net': standardizedFieldsnames.actual_cost,
'selling_price': standardizedFieldsnames.actual_cost,
'net_price': standardizedFieldsnames.actual_cost,
'net_cost': standardizedFieldsnames.actual_cost,
'total': standardizedFieldsnames.actual_cost,
'po_no': standardizedFieldsnames.ro_number,
'customer_po_no': standardizedFieldsnames.ro_number,
'customer_po_no_': standardizedFieldsnames.ro_number
};
return labelMap[normalized] || `NOT_MAPPED => ${normalized}`; // TODO: Should we monitor unmapped labels?
}
function processScanData(invoiceData) {
// Process and clean the extracted data
const processed = {
summary: {},
lineItems: []
};
// Clean summary fields
for (const [key, value] of Object.entries(invoiceData.summary)) {
if (value.confidence > MIN_CONFIDENCE_VALUE) { // Only include fields with > 50% confidence
processed.summary[key] = {
value: value.value,
label: value.label,
normalizedLabel: value.normalizedLabel,
confidence: value.confidence
};
}
}
// Process line items
processed.lineItems = invoiceData.lineItems
.map(item => {
const processedItem = {};
for (const [key, value] of Object.entries(item)) {
if (value.confidence > MIN_CONFIDENCE_VALUE) { // Only include fields with > 50% confidence
let cleanValue = value.value;
// Parse numbers for quantity and price fields
if (key === 'quantity') {
cleanValue = parseFloat(cleanValue) || 0;
} else if (key === 'retail_price' || key === 'actual_price') {
// Remove currency symbols and parse
cleanValue = parseFloat(cleanValue.replace(/[^0-9.-]/g, '')) || 0;
}
processedItem[key] = {
value: cleanValue,
label: value.label,
normalizedLabel: value.normalizedLabel,
confidence: value.confidence
};
}
}
return processedItem;
})
return processed;
}
function extractInvoiceData(textractResponse) {
const invoiceData = {
summary: {},
lineItems: []
};
if (!textractResponse.ExpenseDocuments || textractResponse.ExpenseDocuments.length === 0) {
return invoiceData;
}
// Process each page of the invoice
textractResponse.ExpenseDocuments.forEach(expenseDoc => {
// Extract summary fields (vendor, invoice number, date, total, etc.)
if (expenseDoc.SummaryFields) {
expenseDoc.SummaryFields.forEach(field => {
const fieldType = field.Type?.Text || '';
const fieldValue = field.ValueDetection?.Text || '';
const fieldLabel = field.LabelDetection?.Text || '';
const confidence = field.ValueDetection?.Confidence || 0;
// Map common invoice fields
if (fieldType && fieldValue) {
invoiceData.summary[fieldType] = {
value: fieldValue,
label: fieldLabel,
normalizedLabel: normalizeLabelName(fieldLabel),
confidence: confidence
};
}
});
}
// Extract line items
if (expenseDoc.LineItemGroups) {
expenseDoc.LineItemGroups.forEach(lineItemGroup => {
if (lineItemGroup.LineItems) {
lineItemGroup.LineItems.forEach(lineItem => {
const item = {};
const fieldNameCounts = {}; // Track field name occurrences
if (lineItem.LineItemExpenseFields) {
lineItem.LineItemExpenseFields.forEach(field => {
const fieldType = field.Type?.Text || '';
const fieldValue = field.ValueDetection?.Text || '';
const fieldLabel = field.LabelDetection?.Text || '';
const confidence = field.ValueDetection?.Confidence || 0;
if (fieldType && fieldValue) {
// Normalize field names
let normalizedField = normalizeFieldName(fieldType);
// Ensure uniqueness by appending a counter if the field already exists
if (Object.prototype.hasOwnProperty.call(item, normalizedField)) {
fieldNameCounts[normalizedField] = (fieldNameCounts[normalizedField] || 1) + 1;
normalizedField = `${normalizedField}_${fieldNameCounts[normalizedField]}`;
}
item[normalizedField] = {
value: fieldValue,
label: fieldLabel,
normalizedLabel: normalizeLabelName(fieldLabel),
confidence: confidence
};
}
});
}
if (Object.keys(item).length > 0) {
invoiceData.lineItems.push(item);
}
});
}
});
}
});
return invoiceData;
}
module.exports = {
extractInvoiceData,
processScanData,
standardizedFieldsnames
}

View File

@@ -1,8 +0,0 @@
Required Infrastructure setup
1. Create an AI user that has access to the required S3 buckets and textract permissions.
2. Had to create a queue and SNS topic. had to also create the role that had `sns:Publish`. Had to add `sqs:ReceiveMessage` and `sqs:DeleteMessage` to the profile.
3. Created 2 roles for SNS. The textract role is the right one, the other was created manually based on incorrect instructions.
TODO:
* Create a rome bucket for uploads, or move to the regular spot.
* Add environment variables.

View File

@@ -1,465 +0,0 @@
const { TextractClient, StartExpenseAnalysisCommand, GetExpenseAnalysisCommand, AnalyzeExpenseCommand } = require("@aws-sdk/client-textract");
const { S3Client, PutObjectCommand } = require("@aws-sdk/client-s3");
const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const { v4: uuidv4 } = require('uuid');
const { getTextractJobKey, setTextractJob, getTextractJob, getFileType, getPdfPageCount, hasActiveJobs } = require("./bill-ocr-helpers");
const { extractInvoiceData, processScanData } = require("./bill-ocr-normalize");
const { generateBillFormData } = require("./bill-ocr-generator");
const logger = require("../../utils/logger");
const _ = require("lodash");
// Initialize AWS clients
const awsConfig = {
region: process.env.AWS_AI_REGION || "ca-central-1",
credentials: {
accessKeyId: process.env.AWS_AI_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_AI_SECRET_ACCESS_KEY,
}
};
const textractClient = new TextractClient(awsConfig);
const s3Client = new S3Client(awsConfig);
const sqsClient = new SQSClient(awsConfig);
let redisPubClient = null;
/**
* Initialize the bill-ocr module with Redis client
* @param {Object} pubClient - Redis cluster client
*/
function initializeBillOcr(pubClient) {
redisPubClient = pubClient;
}
/**
* Check if job exists by Textract job ID
* @param {string} textractJobId
* @returns {Promise<boolean>}
*/
async function jobExists(textractJobId) {
if (!redisPubClient) {
throw new Error('Redis client not initialized. Call initializeBillOcr first.');
}
const key = getTextractJobKey(textractJobId);
const exists = await redisPubClient.exists(key);
if (exists) {
return true;
}
return false;
}
async function handleBillOcr(req, res) {
// Check if file was uploaded
if (!req.file) {
return res.status(400).send({ error: 'No file uploaded.' });
}
// The uploaded file is available in request file
const uploadedFile = req.file;
const { jobid, bodyshopid, partsorderid } = req.body;
logger.log("bill-ocr-start", "DEBUG", req.user.email, jobid, null);
try {
const fileType = getFileType(uploadedFile);
// Images are always processed synchronously (single page)
if (fileType === 'image') {
const processedData = await processSinglePageDocument(uploadedFile.buffer);
const billForm = await generateBillFormData({ processedData: processedData, jobid, bodyshopid, partsorderid, req: req });
logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, { ..._.omit(processedData, "originalTextractResponse"), billForm });
return res.status(200).json({
success: true,
status: 'COMPLETED',
data: { ...processedData, billForm },
message: 'Invoice processing completed'
});
} else if (fileType === 'pdf') {
// Check the number of pages in the PDF
const pageCount = await getPdfPageCount(uploadedFile.buffer);
if (pageCount === 1) {
// Process synchronously for single-page documents
const processedData = await processSinglePageDocument(uploadedFile.buffer);
const billForm = await generateBillFormData({ processedData: processedData, jobid, bodyshopid, partsorderid, req: req });
logger.log("bill-ocr-single-complete", "DEBUG", req.user.email, jobid, { ..._.omit(processedData, "originalTextractResponse"), billForm });
return res.status(200).json({
success: true,
status: 'COMPLETED',
data: { ...processedData, billForm },
message: 'Invoice processing completed'
});
}
// Start the Textract job (non-blocking) for multi-page documents
const jobInfo = await startTextractJob(uploadedFile.buffer, { jobid, bodyshopid, partsorderid });
logger.log("bill-ocr-multipage-start", "DEBUG", req.user.email, jobid, jobInfo);
return res.status(202).json({
success: true,
textractJobId: jobInfo.jobId,
message: 'Invoice processing started',
statusUrl: `/ai/bill-ocr/status/${jobInfo.jobId}`
});
} else {
logger.log("bill-ocr-unsupported-filetype", "WARN", req.user.email, jobid, { fileType });
return res.status(400).json({
error: 'Unsupported file type',
message: 'Please upload a PDF or supported image file (JPEG, PNG, TIFF)'
});
}
} catch (error) {
logger.log("bill-ocr-error", "ERROR", req.user.email, jobid, { error: error.message, stack: error.stack });
return res.status(500).json({
error: 'Failed to start invoice processing',
message: error.message
});
}
}
async function handleBillOcrStatus(req, res) {
const { textractJobId } = req.params;
if (!textractJobId) {
logger.log("bill-ocr-status-error", "WARN", req.user.email, null, { error: 'No textractJobId found in params' });
return res.status(400).json({ error: 'Job ID is required' });
}
const jobStatus = await getTextractJob({ redisPubClient, textractJobId });
if (!jobStatus) {
return res.status(404).json({ error: 'Job not found' });
}
if (jobStatus.status === 'COMPLETED') {
// Generate billForm on-demand if not already generated
let billForm = jobStatus.data?.billForm;
if (!billForm && jobStatus.context) {
try {
billForm = await generateBillFormData({
processedData: jobStatus.data,
jobid: jobStatus.context.jobid,
bodyshopid: jobStatus.context.bodyshopid,
partsorderid: jobStatus.context.partsorderid,
req: req // Now we have request context!
});
logger.log("bill-ocr-multipage-complete", "DEBUG", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, billForm });
// Cache the billForm back to Redis for future requests
await setTextractJob({
redisPubClient,
textractJobId,
jobData: {
...jobStatus,
data: {
...jobStatus.data,
billForm
}
}
});
} catch (error) {
logger.log("bill-ocr-multipage-error", "ERROR", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, error: error.message, stack: error.stack });
return res.status(500).send({
status: 'COMPLETED',
error: 'Data processed but failed to generate bill form',
message: error.message,
data: jobStatus.data // Still return the raw processed data
});
}
}
return res.status(200).send({
status: 'COMPLETED',
data: {
...jobStatus.data,
billForm
}
});
} else if (jobStatus.status === 'FAILED') {
logger.log("bill-ocr-multipage-failed", "ERROR", req.user.email, jobStatus.context.jobid, { ...jobStatus.data, error: jobStatus.error, });
return res.status(500).json({
status: 'FAILED',
error: jobStatus.error
});
} else {
return res.status(200).json({
status: jobStatus.status
});
}
}
/**
* Process a single-page document synchronously using AnalyzeExpenseCommand
* @param {Buffer} pdfBuffer
* @returns {Promise<Object>}
*/
async function processSinglePageDocument(pdfBuffer) {
const analyzeCommand = new AnalyzeExpenseCommand({
Document: {
Bytes: pdfBuffer
}
});
const result = await textractClient.send(analyzeCommand);
const invoiceData = extractInvoiceData(result);
const processedData = processScanData(invoiceData);
return {
...processedData,
originalTextractResponse: result
};
}
async function startTextractJob(pdfBuffer, context = {}) {
// Upload PDF to S3 temporarily for Textract async processing
const { bodyshopid, jobid } = context;
const s3Bucket = process.env.AWS_AI_BUCKET;
const snsTopicArn = process.env.AWS_TEXTRACT_SNS_TOPIC_ARN;
const snsRoleArn = process.env.AWS_TEXTRACT_SNS_ROLE_ARN;
if (!s3Bucket) {
throw new Error('AWS_AI_BUCKET environment variable is required');
}
if (!snsTopicArn) {
throw new Error('AWS_TEXTRACT_SNS_TOPIC_ARN environment variable is required');
}
if (!snsRoleArn) {
throw new Error('AWS_TEXTRACT_SNS_ROLE_ARN environment variable is required');
}
const uploadId = uuidv4();
const s3Key = `textract-temp/${bodyshopid}/${jobid}/${uploadId}.pdf`; //TODO Update Keys structure to something better.
// Upload to S3
const uploadCommand = new PutObjectCommand({
Bucket: s3Bucket,
Key: s3Key,
Body: pdfBuffer,
ContentType: 'application/pdf' //Hard coded - we only support PDFs for multi-page
});
await s3Client.send(uploadCommand);
// Start async Textract expense analysis with SNS notification
const startCommand = new StartExpenseAnalysisCommand({
DocumentLocation: {
S3Object: {
Bucket: s3Bucket,
Name: s3Key
}
},
NotificationChannel: {
SNSTopicArn: snsTopicArn,
RoleArn: snsRoleArn
},
ClientRequestToken: uploadId
});
const startResult = await textractClient.send(startCommand);
const textractJobId = startResult.JobId;
// Store job info in Redis using textractJobId as the key
await setTextractJob(
{
redisPubClient,
textractJobId,
jobData: {
status: 'IN_PROGRESS',
s3Key: s3Key,
uploadId: uploadId,
startedAt: new Date().toISOString(),
context: context // Store the context for later use
}
}
);
return {
jobId: textractJobId
};
}
// Process SQS messages from Textract completion notifications
async function processSQSMessages() {
const queueUrl = process.env.AWS_TEXTRACT_SQS_QUEUE_URL;
if (!queueUrl) {
logger.log("bill-ocr-error", "ERROR", "api", null, { message: "AWS_TEXTRACT_SQS_QUEUE_URL not configured" });
return;
}
// Only poll if there are active mutli page jobs in progress
const hasActive = await hasActiveJobs({ redisPubClient });
if (!hasActive) {
return;
}
try {
const receiveCommand = new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20,
MessageAttributeNames: ['All']
});
const result = await sqsClient.send(receiveCommand);
if (result.Messages && result.Messages.length > 0) {
logger.log("bill-ocr-sqs-processing", "DEBUG", "api", null, { message: `Processing ${result.Messages.length} messages from SQS` });
for (const message of result.Messages) {
try {
// Environment-level filtering: check if this message belongs to this environment
const shouldProcess = await shouldProcessMessage(message);
if (shouldProcess) {
await handleTextractNotification(message);
// Delete message after successful processing
const deleteCommand = new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
});
await sqsClient.send(deleteCommand);
}
} catch (error) {
logger.log("bill-ocr-sqs-processing-error", "ERROR", "api", null, { message, error: error.message, stack: error.stack });
}
}
}
} catch (error) {
logger.log("bill-ocr-sqs-receiving-error", "ERROR", "api", null, { error: error.message, stack: error.stack });
}
}
/**
* Check if a message should be processed by this environment
* @param {Object} message - SQS message
* @returns {Promise<boolean>}
*/
async function shouldProcessMessage(message) {
try {
const body = JSON.parse(message.Body);
const snsMessage = JSON.parse(body.Message);
const textractJobId = snsMessage.JobId;
// Check if job exists in Redis for this environment (using environment-specific prefix)
const exists = await jobExists(textractJobId);
return exists;
} catch (error) {
logger.log("bill-ocr-message-check-error", "DEBUG", "api", null, { message: "Error checking if message should be processed", error: error.message, stack: error.stack });
// If we can't parse the message, don't process it
return false;
}
}
async function handleTextractNotification(message) {
const body = JSON.parse(message.Body);
let snsMessage
try {
snsMessage = JSON.parse(body.Message);
} catch (error) {
logger.log("bill-ocr-handle-textract-error", "DEBUG", "api", null, { message: "Error parsing SNS message - invalid message format.", error: error.message, stack: error.stack, body });
return;
}
const textractJobId = snsMessage.JobId;
const status = snsMessage.Status;
// Get job info from Redis
const jobInfo = await getTextractJob({ redisPubClient, textractJobId });
if (!jobInfo) {
logger.log("bill-ocr-job-not-found", "DEBUG", "api", null, { message: `Job info not found in Redis for Textract job ID: ${textractJobId}`, textractJobId, snsMessage });
return;
}
if (status === 'SUCCEEDED') {
// Retrieve the results
const { processedData, originalResponse } = await retrieveTextractResults(textractJobId);
// Store the processed data - billForm will be generated on-demand in the status endpoint
await setTextractJob(
{
redisPubClient,
textractJobId,
jobData: {
...jobInfo,
status: 'COMPLETED',
data: {
...processedData,
originalTextractResponse: originalResponse
},
completedAt: new Date().toISOString()
}
}
);
} else if (status === 'FAILED') {
await setTextractJob(
{
redisPubClient,
textractJobId,
jobData: {
...jobInfo,
status: 'FAILED',
error: snsMessage.StatusMessage || 'Textract job failed',
completedAt: new Date().toISOString()
}
}
);
}
}
async function retrieveTextractResults(textractJobId) {
// Handle pagination if there are multiple pages of results
let allExpenseDocuments = [];
let nextToken = null;
do {
const getCommand = new GetExpenseAnalysisCommand({
JobId: textractJobId,
NextToken: nextToken
});
const result = await textractClient.send(getCommand);
if (result.ExpenseDocuments) {
allExpenseDocuments = allExpenseDocuments.concat(result.ExpenseDocuments);
}
nextToken = result.NextToken;
} while (nextToken);
// Store the complete original response
const fullTextractResponse = { ExpenseDocuments: allExpenseDocuments };
// Extract invoice data from Textract response
const invoiceData = extractInvoiceData(fullTextractResponse);
return {
processedData: processScanData(invoiceData),
originalResponse: fullTextractResponse
};
}
// Start SQS polling (call this when server starts)
function startSQSPolling() {
const pollInterval = setInterval(() => {
processSQSMessages().catch(error => {
logger.log("bill-ocr-sqs-poll-error", "ERROR", "api", null, { message: error.message, stack: error.stack });
});
}, 10000); // Poll every 10 seconds
return pollInterval;
}
module.exports = {
initializeBillOcr,
handleBillOcr,
handleBillOcrStatus,
startSQSPolling
};