Skip to content

Commit 2c7e071

Browse files
feat(llm): add async document and receipt extraction pipelines
- Introduce async_document_extraction_pipeline.py for generic document extraction with parallel processing. - Implement async_extract_receipts_pipeline.py for receipt extraction, leveraging async capabilities for improved performance. - Update README to reflect new async pipelines, highlighting performance benefits and usage examples. - Include receipt-specific schemas and data transformations for structured extraction. Benefits: - Significant speed improvements for batch processing through concurrent LLM calls and parallel image processing. - Enhanced user experience with clear examples and documentation for both sync and async usage.
1 parent 73b9cfd commit 2c7e071

File tree

3 files changed

+690
-10
lines changed

3 files changed

+690
-10
lines changed

llm/smart_data_extraction_llamaindex/README.md

Lines changed: 198 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,48 @@ A flexible, schema-driven pipeline for extracting structured data from any type
1313

1414
```
1515
llm/smart_data_extraction_llamaindex/
16-
├── document_extraction_pipeline.py # Generic pipeline (reusable)
17-
├── extract_receipts_pipeline.py # Receipt-specific (schema + logic + example)
18-
└── README.md # This file
16+
├── document_extraction_pipeline.py # Generic pipeline (synchronous)
17+
├── async_document_extraction_pipeline.py # Generic pipeline (async, parallel)
18+
├── extract_receipts_pipeline.py # Receipt extraction (sync + async functions)
19+
├── async_extract_receipts_pipeline.py # Receipt extraction (pure async)
20+
└── README.md # This file
1921
```
2022

23+
## Performance: Sync vs Async
24+
25+
The pipeline is available in two versions:
26+
27+
- **Synchronous** (`document_extraction_pipeline.py`): Simple, sequential processing
28+
- **Asynchronous** (`async_document_extraction_pipeline.py`): Parallel processing with 3-10x speedup
29+
30+
For batch processing of multiple documents, the async version provides significant performance improvements through concurrent LLM calls and parallel image processing.
31+
2132
## Quick Start
2233

2334
### Option 1: Use the Receipt Pipeline
2435

25-
Run the ready-to-use receipt extraction pipeline:
36+
**Async Version (Recommended for batch processing):**
37+
38+
```bash
39+
uv run async_extract_receipts_pipeline.py
40+
```
41+
42+
**Sync Version (Simple, sequential processing):**
2643

2744
```bash
2845
uv run extract_receipts_pipeline.py
2946
```
3047

31-
The receipt pipeline includes:
48+
Both receipt pipelines include:
3249
- `Receipt` and `ReceiptItem` Pydantic schemas
33-
- Receipt-specific data transformations
50+
- Receipt-specific data transformations (uppercase company, parse dates)
3451
- Pre-configured extraction prompt
52+
- Image scaling for better OCR
3553
- Example usage in `__main__` block
3654

37-
### Option 2: Create Your Own Pipeline
55+
The async version processes 4 receipts in parallel by default and includes progress indicators.
56+
57+
### Option 2: Create Your Own Pipeline (Synchronous)
3858

3959
Import the generic pipeline and create a custom extractor:
4060

@@ -89,6 +109,72 @@ if __name__ == "__main__":
89109
print(result_df)
90110
```
91111

112+
### Option 3: Create Your Own Pipeline (Async - Recommended for Batch Processing)
113+
114+
Import the async pipeline for better performance with multiple documents:
115+
116+
```python
117+
import asyncio
118+
from datetime import date
119+
from pathlib import Path
120+
from typing import Optional
121+
122+
import pandas as pd
123+
from pydantic import BaseModel, Field
124+
from async_document_extraction_pipeline import extract_structured_data_async
125+
126+
127+
# 1. Define your schema
128+
class Invoice(BaseModel):
129+
invoice_number: str = Field(description="Invoice number")
130+
vendor_name: str = Field(description="Vendor name")
131+
invoice_date: Optional[date] = Field(default=None)
132+
total_amount: float = Field(description="Total amount")
133+
134+
135+
# 2. Optional: Define data transformer
136+
def transform_invoice_data(df: pd.DataFrame) -> pd.DataFrame:
137+
"""Transform extracted invoice data."""
138+
df = df.copy()
139+
df["vendor_name"] = df["vendor_name"].str.upper()
140+
df["total_amount"] = pd.to_numeric(df["total_amount"], errors="coerce")
141+
return df
142+
143+
144+
# 3. Define extraction prompt
145+
INVOICE_PROMPT = """
146+
Extract invoice data from the following document.
147+
If a field is missing, return null.
148+
149+
{context_str}
150+
"""
151+
152+
153+
# 4. Run extraction with async
154+
async def main():
155+
invoice_paths = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"]
156+
157+
result_df = await extract_structured_data_async(
158+
image_paths=invoice_paths,
159+
output_cls=Invoice,
160+
prompt=INVOICE_PROMPT,
161+
id_column="invoice_id",
162+
data_transformer=transform_invoice_data,
163+
num_workers=4, # Process 4 documents in parallel
164+
)
165+
166+
print(result_df)
167+
168+
169+
if __name__ == "__main__":
170+
asyncio.run(main())
171+
```
172+
173+
**Performance Benefits:**
174+
- **Sync version**: Processes documents sequentially (1 at a time)
175+
- **Async version**: Processes documents in parallel (4+ at a time)
176+
- **Expected speedup**: 3-10x faster depending on `num_workers` and API rate limits
177+
92178
## API Reference
93179

94180
### `extract_structured_data()` Function
@@ -121,6 +207,34 @@ def extract_structured_data(
121207
**Returns:**
122208
- `pd.DataFrame`: Extracted data
123209

210+
### `extract_structured_data_async()` Function
211+
212+
```python
213+
async def extract_structured_data_async(
214+
image_paths: List[str],
215+
output_cls: Type[BaseModel],
216+
prompt: str,
217+
id_column: str = "document_id",
218+
fields: Optional[List[str]] = None,
219+
image_transform_fn: Optional[Callable[[Image.Image], Image.Image]] = None,
220+
image_output_dir: Optional[Path] = None,
221+
data_transformer: Optional[Callable[[pd.DataFrame], pd.DataFrame]] = None,
222+
num_workers: int = 4,
223+
) -> pd.DataFrame
224+
```
225+
226+
**Additional Parameters:**
227+
- `num_workers`: Number of parallel workers for processing (default: 4). Higher values increase concurrency but may hit API rate limits.
228+
229+
**Performance Characteristics:**
230+
- Uses `asyncio` and `run_jobs()` for parallel processing
231+
- Concurrent LLM extraction calls (limited by `num_workers`)
232+
- Parallel image preprocessing
233+
- Non-blocking I/O operations
234+
235+
**Usage:**
236+
Must be called with `await` inside an async function or using `asyncio.run()`.
237+
124238
## Usage Examples
125239

126240
### Basic Extraction
@@ -177,6 +291,78 @@ result = extract_structured_data(
177291
)
178292
```
179293

294+
### Async Batch Processing
295+
296+
```python
297+
import asyncio
298+
from async_document_extraction_pipeline import extract_structured_data_async
299+
300+
async def process_large_batch():
301+
"""Process 100+ documents efficiently with parallel workers."""
302+
document_paths = [f"doc_{i}.pdf" for i in range(100)]
303+
304+
result = await extract_structured_data_async(
305+
image_paths=document_paths,
306+
output_cls=MySchema,
307+
prompt="Extract: {context_str}",
308+
num_workers=6, # Process 6 documents concurrently
309+
)
310+
311+
return result
312+
313+
# Run the async function
314+
result_df = asyncio.run(process_large_batch())
315+
```
316+
317+
**When to use async:**
318+
- Processing 10+ documents
319+
- Time-sensitive batch operations
320+
- High-volume document extraction
321+
- When you need maximum throughput
322+
323+
**When to use sync:**
324+
- Single document extraction
325+
- Simple scripts
326+
- Learning/prototyping
327+
- When simplicity is preferred over performance
328+
329+
### Import and Use Receipt Extractors
330+
331+
**Async receipt extractor:**
332+
333+
```python
334+
import asyncio
335+
from pathlib import Path
336+
from async_extract_receipts_pipeline import extract_receipts
337+
338+
async def process_my_receipts():
339+
paths = [Path("receipt1.jpg"), Path("receipt2.jpg")]
340+
result = await extract_receipts(
341+
receipt_paths=paths,
342+
output_dir=Path("processed"),
343+
num_workers=6 # Higher parallelism for more receipts
344+
)
345+
return result
346+
347+
# Run it
348+
df = asyncio.run(process_my_receipts())
349+
print(df)
350+
```
351+
352+
**Sync receipt extractor:**
353+
354+
```python
355+
from pathlib import Path
356+
from extract_receipts_pipeline import extract_receipts_sync
357+
358+
paths = [Path("receipt1.jpg"), Path("receipt2.jpg")]
359+
result = extract_receipts_sync(
360+
receipt_paths=paths,
361+
output_dir=Path("processed")
362+
)
363+
print(result)
364+
```
365+
180366
## Creating New Document Extractors
181367

182368
To create a new document extractor (like the receipt pipeline):
@@ -199,12 +385,14 @@ def rotate_and_scale(img: Image.Image) -> Image.Image:
199385
return rotated.resize(new_size, Image.Resampling.LANCZOS)
200386
```
201387

202-
See [extract_receipts_pipeline.py](extract_receipts_pipeline.py) for a complete example.
388+
**Examples:**
389+
- **Sync**: [extract_receipts_pipeline.py](extract_receipts_pipeline.py) - includes both sync and async functions
390+
- **Async**: [async_extract_receipts_pipeline.py](async_extract_receipts_pipeline.py) - pure async implementation with progress tracking
203391

204392
## Dependencies
205393

206-
### Generic Pipeline
207-
Required packages (in `document_extraction_pipeline.py`):
394+
### Generic Pipeline (Both Sync and Async)
395+
Required packages (in `document_extraction_pipeline.py` and `async_document_extraction_pipeline.py`):
208396
- llama-index
209397
- llama-index-program-openai
210398
- llama-parse

0 commit comments

Comments
 (0)