Terminal.skills
Use Cases/Build an Intelligent Document Processing Pipeline

Build an Intelligent Document Processing Pipeline

Automate extraction of structured data from invoices, contracts, and receipts using OCR, LLM classification, and validation rules — replacing 12 data entry clerks with a pipeline that processes 10K documents/day at 98.5% accuracy.

#queue#redis#jobs#background#worker
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

Amara is COO at a mid-size logistics company that processes 10,000 documents per day — invoices from 500+ vendors, shipping receipts, customs declarations, and contracts. Twelve data entry clerks manually read each document and type the relevant fields into the ERP system. It takes 3-5 minutes per document, costs $480K/year in payroll, and has a 4.2% error rate. Errors cascade: a wrong amount on an invoice means wrong payment, wrong payment means vendor dispute, vendor dispute means 6 hours of back-and-forth. Last year, data entry errors cost $210K in overpayments that were never recovered.

Amara needs:

  • Multi-format support — PDFs, scanned images, photos of receipts, Word docs
  • Structured extraction — pull invoice number, date, amounts, line items, vendor info
  • Confidence scoring — flag low-confidence extractions for human review
  • Vendor-specific templates — learn each vendor's invoice layout over time
  • Validation rules — cross-check totals, detect duplicates, verify vendor exists
  • Human-in-the-loop — reviewers correct errors, and corrections improve the model

Step 1: Document Intake and Classification

typescript
// src/intake/classifier.ts
// Classifies incoming documents by type and routes to the right extraction pipeline

import { generateObject } from 'ai';
import { openai } from '@ai-sdk/openai';
import { z } from 'zod';

export const DocumentType = z.enum([
  'invoice',
  'receipt',
  'purchase_order',
  'contract',
  'customs_declaration',
  'packing_list',
  'bill_of_lading',
  'unknown',
]);

const ClassificationResult = z.object({
  documentType: DocumentType,
  confidence: z.number().min(0).max(1),
  language: z.string(),
  pageCount: z.number().int().positive(),
  hasHandwriting: z.boolean(),
  quality: z.enum(['high', 'medium', 'low']),
});

export async function classifyDocument(
  ocrText: string,
  pageCount: number
): Promise<z.infer<typeof ClassificationResult>> {
  // Use first 500 chars for classification — fast and cheap
  const sample = ocrText.slice(0, 500);

  const { object } = await generateObject({
    model: openai('gpt-4o-mini'),
    schema: ClassificationResult,
    prompt: `Classify this document based on its OCR text:

${sample}

Page count: ${pageCount}

Determine the document type, language, whether it contains handwriting, and OCR quality.`,
    temperature: 0.1,
  });

  return object;
}

Step 2: OCR and Text Extraction

typescript
// src/ocr/extractor.ts
// Extracts text from PDFs and images using multiple OCR strategies

import { execSync } from 'child_process';
import { readFileSync, writeFileSync } from 'fs';
import { join } from 'path';
import { randomUUID } from 'crypto';

interface OcrResult {
  text: string;
  pages: Array<{
    pageNumber: number;
    text: string;
    confidence: number;
  }>;
  method: 'native_pdf' | 'tesseract' | 'vision_api';
}

export async function extractText(filePath: string, mimeType: string): Promise<OcrResult> {
  // Strategy 1: Native PDF text extraction (fastest, most accurate for digital PDFs)
  if (mimeType === 'application/pdf') {
    const nativeText = extractNativePdf(filePath);
    if (nativeText && nativeText.length > 100) {
      return {
        text: nativeText,
        pages: [{ pageNumber: 1, text: nativeText, confidence: 0.99 }],
        method: 'native_pdf',
      };
    }
  }

  // Strategy 2: Tesseract OCR (for scanned documents)
  try {
    const tesseractResult = await runTesseract(filePath);
    if (tesseractResult.confidence > 0.7) {
      return tesseractResult;
    }
  } catch {
    // Fall through to vision API
  }

  // Strategy 3: Vision API (for poor quality scans, photos, handwriting)
  return await runVisionOcr(filePath);
}

function extractNativePdf(filePath: string): string {
  try {
    // pdftotext from poppler-utils
    const result = execSync(`pdftotext -layout "${filePath}" -`, {
      maxBuffer: 10 * 1024 * 1024,
    });
    return result.toString().trim();
  } catch {
    return '';
  }
}

async function runTesseract(filePath: string): Promise<OcrResult> {
  const outBase = join('/tmp', randomUUID());

  // Convert PDF to images if needed
  let imagePath = filePath;
  if (filePath.endsWith('.pdf')) {
    execSync(`pdftoppm -png -r 300 "${filePath}" "${outBase}"`);
    imagePath = `${outBase}-1.png`;
  }

  // Run Tesseract with confidence output
  execSync(`tesseract "${imagePath}" "${outBase}" --oem 3 --psm 6 -l eng tsv`);

  const tsv = readFileSync(`${outBase}.tsv`, 'utf-8');
  const lines = tsv.split('\n').filter(l => l.trim());

  let totalConf = 0;
  let wordCount = 0;
  const words: string[] = [];

  for (const line of lines.slice(1)) {
    const parts = line.split('\t');
    const conf = parseInt(parts[10] ?? '0');
    const text = parts[11] ?? '';
    if (text.trim() && conf > 0) {
      words.push(text);
      totalConf += conf;
      wordCount++;
    }
  }

  const avgConfidence = wordCount > 0 ? totalConf / wordCount / 100 : 0;
  const fullText = words.join(' ');

  return {
    text: fullText,
    pages: [{ pageNumber: 1, text: fullText, confidence: avgConfidence }],
    method: 'tesseract',
  };
}

async function runVisionOcr(filePath: string): Promise<OcrResult> {
  // Use OpenAI vision for high-quality OCR as fallback
  const imageData = readFileSync(filePath);
  const base64 = imageData.toString('base64');
  const mimeType = filePath.endsWith('.png') ? 'image/png' : 'image/jpeg';

  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'gpt-4o-mini',
      messages: [{
        role: 'user',
        content: [
          { type: 'text', text: 'Extract ALL text from this document image. Preserve the layout structure. Output only the extracted text, nothing else.' },
          { type: 'image_url', image_url: { url: `data:${mimeType};base64,${base64}` } },
        ],
      }],
      max_tokens: 4000,
    }),
  });

  const result = await response.json() as any;
  const text = result.choices[0]?.message?.content ?? '';

  return {
    text,
    pages: [{ pageNumber: 1, text, confidence: 0.9 }],
    method: 'vision_api',
  };
}

Step 3: Structured Data Extraction with LLM

typescript
// src/extraction/invoice-extractor.ts
// Extracts structured invoice data from OCR text using LLM

import { generateObject } from 'ai';
import { openai } from '@ai-sdk/openai';
import { z } from 'zod';

export const InvoiceData = z.object({
  invoiceNumber: z.string(),
  invoiceDate: z.string(),
  dueDate: z.string().optional(),
  vendorName: z.string(),
  vendorAddress: z.string().optional(),
  vendorTaxId: z.string().optional(),
  buyerName: z.string().optional(),
  currency: z.string().length(3),
  subtotal: z.number(),
  taxAmount: z.number(),
  totalAmount: z.number(),
  lineItems: z.array(z.object({
    description: z.string(),
    quantity: z.number(),
    unitPrice: z.number(),
    lineTotal: z.number(),
  })),
  paymentTerms: z.string().optional(),
  bankDetails: z.object({
    bankName: z.string().optional(),
    accountNumber: z.string().optional(),
    routingNumber: z.string().optional(),
    iban: z.string().optional(),
  }).optional(),
  confidence: z.number().min(0).max(1),
});

export type InvoiceData = z.infer<typeof InvoiceData>;

export async function extractInvoiceData(
  ocrText: string,
  vendorTemplate?: string  // known vendor layout hints
): Promise<InvoiceData> {
  const systemPrompt = vendorTemplate
    ? `You extract structured data from invoices. This vendor's invoices typically have: ${vendorTemplate}`
    : 'You extract structured data from invoices. Be precise with numbers — amounts must exactly match the document.';

  const { object } = await generateObject({
    model: openai('gpt-4o-mini'),
    schema: InvoiceData,
    system: systemPrompt,
    prompt: `Extract all invoice data from this OCR text. If a field is unclear, make your best guess and set confidence lower.

OCR Text:
${ocrText}`,
    temperature: 0.1,  // deterministic extraction
  });

  return object;
}

Step 4: Validation Engine

typescript
// src/validation/validator.ts
// Cross-checks extracted data for consistency and duplicates

import { Pool } from 'pg';
import type { InvoiceData } from '../extraction/invoice-extractor';

const db = new Pool({ connectionString: process.env.DATABASE_URL });

interface ValidationResult {
  valid: boolean;
  errors: Array<{ field: string; message: string; severity: 'error' | 'warning' }>;
  duplicateOf?: string;
}

export async function validateInvoice(invoice: InvoiceData): Promise<ValidationResult> {
  const errors: ValidationResult['errors'] = [];

  // 1. Line items should sum to subtotal
  const lineItemTotal = invoice.lineItems.reduce((sum, item) => sum + item.lineTotal, 0);
  const tolerance = 0.02;  // allow 2 cent rounding difference
  if (Math.abs(lineItemTotal - invoice.subtotal) > tolerance) {
    errors.push({
      field: 'subtotal',
      message: `Line items total (${lineItemTotal.toFixed(2)}) doesn't match subtotal (${invoice.subtotal.toFixed(2)})`,
      severity: 'error',
    });
  }

  // 2. Subtotal + tax should equal total
  const expectedTotal = invoice.subtotal + invoice.taxAmount;
  if (Math.abs(expectedTotal - invoice.totalAmount) > tolerance) {
    errors.push({
      field: 'totalAmount',
      message: `Subtotal + tax (${expectedTotal.toFixed(2)}) doesn't match total (${invoice.totalAmount.toFixed(2)})`,
      severity: 'error',
    });
  }

  // 3. Each line item: qty * unitPrice should equal lineTotal
  for (let i = 0; i < invoice.lineItems.length; i++) {
    const item = invoice.lineItems[i];
    const expected = item.quantity * item.unitPrice;
    if (Math.abs(expected - item.lineTotal) > tolerance) {
      errors.push({
        field: `lineItems[${i}].lineTotal`,
        message: `${item.quantity} × ${item.unitPrice} = ${expected.toFixed(2)}, got ${item.lineTotal.toFixed(2)}`,
        severity: 'warning',
      });
    }
  }

  // 4. Duplicate detection
  const duplicate = await db.query(`
    SELECT id FROM processed_invoices
    WHERE vendor_name = $1 AND invoice_number = $2 AND total_amount = $3
    LIMIT 1
  `, [invoice.vendorName, invoice.invoiceNumber, invoice.totalAmount]);

  let duplicateOf: string | undefined;
  if (duplicate.rows.length > 0) {
    duplicateOf = duplicate.rows[0].id;
    errors.push({
      field: 'invoiceNumber',
      message: `Possible duplicate of ${duplicateOf}`,
      severity: 'error',
    });
  }

  // 5. Date sanity check
  const invoiceDate = new Date(invoice.invoiceDate);
  const now = new Date();
  if (invoiceDate > now) {
    errors.push({
      field: 'invoiceDate',
      message: 'Invoice date is in the future',
      severity: 'warning',
    });
  }
  if (invoiceDate < new Date(now.getFullYear() - 2, 0, 1)) {
    errors.push({
      field: 'invoiceDate',
      message: 'Invoice date is more than 2 years old',
      severity: 'warning',
    });
  }

  // 6. Vendor exists in master data
  const vendor = await db.query(
    `SELECT id FROM vendors WHERE LOWER(name) LIKE LOWER($1) LIMIT 1`,
    [`%${invoice.vendorName}%`]
  );
  if (vendor.rows.length === 0) {
    errors.push({
      field: 'vendorName',
      message: `Vendor "${invoice.vendorName}" not found in master data`,
      severity: 'warning',
    });
  }

  return {
    valid: !errors.some(e => e.severity === 'error'),
    errors,
    duplicateOf,
  };
}

Step 5: Processing Pipeline with BullMQ

typescript
// src/pipeline/processor.ts
// Orchestrates the full document processing pipeline

import { Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { extractText } from '../ocr/extractor';
import { classifyDocument } from '../intake/classifier';
import { extractInvoiceData } from '../extraction/invoice-extractor';
import { validateInvoice } from '../validation/validator';
import { Pool } from 'pg';

const connection = new Redis(process.env.REDIS_URL!);
const db = new Pool({ connectionString: process.env.DATABASE_URL });

export const documentQueue = new Queue('document-processing', { connection });

const worker = new Worker('document-processing', async (job) => {
  const { documentId, filePath, mimeType } = job.data;
  const startTime = Date.now();

  // Step 1: OCR
  await job.updateProgress(10);
  const ocrResult = await extractText(filePath, mimeType);

  // Step 2: Classify
  await job.updateProgress(30);
  const classification = await classifyDocument(ocrResult.text, ocrResult.pages.length);

  // Step 3: Extract structured data
  await job.updateProgress(50);
  let extractedData: any = null;
  if (classification.documentType === 'invoice') {
    extractedData = await extractInvoiceData(ocrResult.text);
  }
  // Add more extractors for other document types...

  // Step 4: Validate
  await job.updateProgress(70);
  let validation = null;
  if (extractedData && classification.documentType === 'invoice') {
    validation = await validateInvoice(extractedData);
  }

  // Step 5: Decide routing
  const needsReview = !validation?.valid ||
    extractedData?.confidence < 0.85 ||
    classification.confidence < 0.8;

  // Step 6: Store results
  await job.updateProgress(90);
  await db.query(`
    INSERT INTO processed_documents
      (id, document_type, ocr_text, extracted_data, validation_result,
       confidence, needs_review, processing_time_ms, processed_at)
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
  `, [
    documentId,
    classification.documentType,
    ocrResult.text,
    JSON.stringify(extractedData),
    JSON.stringify(validation),
    extractedData?.confidence ?? classification.confidence,
    needsReview,
    Date.now() - startTime,
  ]);

  // Auto-approve high-confidence, validated documents
  if (!needsReview && extractedData) {
    await db.query(`
      INSERT INTO processed_invoices
        (vendor_name, invoice_number, total_amount, invoice_date, data, status)
      VALUES ($1, $2, $3, $4, $5, 'approved')
    `, [
      extractedData.vendorName,
      extractedData.invoiceNumber,
      extractedData.totalAmount,
      extractedData.invoiceDate,
      JSON.stringify(extractedData),
    ]);
  }

  return {
    documentId,
    documentType: classification.documentType,
    needsReview,
    confidence: extractedData?.confidence ?? 0,
    processingTimeMs: Date.now() - startTime,
  };
}, {
  connection,
  concurrency: 5,  // process 5 documents in parallel
});

Results

After 3 months processing 10K documents/day:

  • Processing speed: 8 seconds average per document (was 3-5 minutes manual)
  • Accuracy: 98.5% on invoices (was 95.8% with manual entry)
  • Auto-approved: 76% of documents processed without human review
  • Human review time: 30 seconds per flagged document (context pre-loaded, corrections pre-filled)
  • Staff reallocation: 10 of 12 clerks moved to vendor relationship management (higher-value work)
  • Cost savings: $380K/year ($480K payroll - $100K pipeline cost)
  • Overpayment errors: dropped from $210K/year to $12K/year (94% reduction)
  • Duplicate detection: caught 340 duplicate invoices in first month ($89K saved)
  • Vendor template learning: top 50 vendors (80% of volume) have trained templates with 99.2% accuracy
  • OCR method distribution: 60% native PDF (free), 35% Tesseract (free), 5% Vision API ($0.003/page)