Streaming Large Files¶
Jetliner is designed for streaming large Avro files with minimal memory overhead. This guide covers memory-efficient processing techniques.
Architecture¶
Jetliner reads Avro files block-by-block rather than loading entire files into memory:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Avro File │ ──► │ Buffer │ ──► │ DataFrame │
│ (blocks) │ │ (prefetch) │ │ (batches) │
└─────────────┘ └─────────────┘ └─────────────┘
This streaming architecture enables processing files larger than available RAM.
Using open() for Streaming Control¶
The open() API gives you direct control over batch processing:
import jetliner
with jetliner.open("large_file.avro") as reader:
for batch in reader:
# Process each batch individually
# Memory is released after each iteration
process(batch)
Processing Without Accumulation¶
For true streaming (constant memory usage):
import jetliner
total_amount = 0
row_count = 0
with jetliner.open("huge_file.avro") as reader:
for batch in reader:
# Aggregate without keeping data in memory
total_amount += batch["amount"].sum()
row_count += batch.height
print(f"Total: {total_amount}, Rows: {row_count}")
Writing Results Incrementally¶
Stream results to disk without accumulating in memory:
import jetliner
with jetliner.open("input.avro") as reader:
for i, batch in enumerate(reader):
# Process and write each batch
processed = batch.filter(batch["amount"] > 0)
processed.write_parquet(f"output/part_{i:04d}.parquet")
Buffer Configuration¶
Jetliner uses prefetching to overlap I/O with processing. Configure buffers based on your environment:
Parameters¶
| Parameter | Default | Description |
|---|---|---|
buffer_blocks |
4 | Number of Avro blocks to prefetch |
buffer_bytes |
64MB | Maximum bytes to buffer |
High-Throughput Settings¶
For maximum speed when memory is available:
import jetliner
# More prefetching, larger buffer
df = jetliner.scan_avro(
"data.avro",
buffer_blocks=8,
buffer_bytes=128 * 1024 * 1024, # 128MB
).collect()
Memory-Constrained Settings¶
For environments with limited memory (Lambda, containers):
import jetliner
# Less prefetching, smaller buffer
with jetliner.open(
"data.avro",
buffer_blocks=2,
buffer_bytes=16 * 1024 * 1024, # 16MB
) as reader:
for batch in reader:
process(batch)
Batch Size Control¶
Control the number of records per batch:
import jetliner
# Smaller batches for fine-grained control
with jetliner.open("data.avro", batch_size=10_000) as reader:
for batch in reader:
assert batch.height <= 10_000
process(batch)
# Larger batches for better throughput
with jetliner.open("data.avro", batch_size=500_000) as reader:
for batch in reader:
process(batch)
Progress Tracking¶
Track progress during streaming:
import jetliner
with jetliner.open("large_file.avro") as reader:
total_rows = 0
batch_count = 0
for batch in reader:
batch_count += 1
total_rows += batch.height
if batch_count % 10 == 0:
print(f"Processed {batch_count} batches, {total_rows:,} rows")
process(batch)
print(f"Complete: {batch_count} batches, {total_rows:,} rows")
With tqdm¶
import jetliner
from tqdm import tqdm
with jetliner.open("large_file.avro") as reader:
for batch in tqdm(reader, desc="Processing"):
process(batch)
Memory Estimation¶
Estimate memory requirements for your data:
import jetliner
# Check schema to estimate row size
with jetliner.open("data.avro") as reader:
schema = reader.schema_dict
# Get first batch to estimate memory per row
first_batch = next(iter(reader))
bytes_per_row = first_batch.estimated_size() / first_batch.height
print(f"Estimated bytes per row: {bytes_per_row:.0f}")
print(f"For 1M rows: {bytes_per_row * 1_000_000 / 1024**2:.0f} MB")
Streaming with scan_avro()¶
The scan_avro() API also streams internally, but collects results at the end:
import jetliner
import polars as pl
# Streaming happens internally, but collect() accumulates results
df = jetliner.scan_avro("large_file.avro").collect()
For truly large results, use open() or write results incrementally:
import jetliner
# Stream and write without full accumulation
lf = jetliner.scan_avro("large_file.avro")
lf.sink_parquet("output.parquet") # Polars streaming sink
AWS Lambda Considerations¶
Lambda has limited memory (128MB - 10GB). Optimize for Lambda:
import jetliner
def lambda_handler(event, context):
# Conservative settings for Lambda
with jetliner.open(
event["s3_uri"],
storage_options={"region": "us-east-1"},
buffer_blocks=2,
buffer_bytes=32 * 1024 * 1024, # 32MB
batch_size=50_000,
) as reader:
results = []
for batch in reader:
# Process and aggregate, don't accumulate raw data
summary = batch.group_by("category").agg(
pl.col("amount").sum()
)
results.append(summary)
return pl.concat(results).to_dicts()
Parallel Processing¶
Process batches in parallel (when order doesn't matter):
import jetliner
from concurrent.futures import ThreadPoolExecutor
def process_batch(batch):
# CPU-bound processing
return batch.filter(batch["amount"] > 0).height
with jetliner.open("data.avro") as reader:
batches = list(reader)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_batch, batches))
print(f"Total matching rows: {sum(results)}")
Memory
Collecting all batches into a list defeats streaming benefits. Use this pattern only when batches fit in memory.
Best Practices¶
- Use open() for large files: When you can't fit results in memory
- Process incrementally: Aggregate or write results as you go
- Tune buffer settings: Match your memory constraints
- Monitor memory: Use tools like
psutilto track usage - Combine with projection: Select only needed columns to reduce memory
Troubleshooting¶
Out of Memory¶
- Reduce
buffer_blocksandbuffer_bytes - Use smaller
batch_size - Process batches without accumulating
- Select fewer columns with projection pushdown
Slow Performance¶
- Increase
buffer_blocksfor more prefetching - Increase
batch_sizefor fewer Python iterations - Use projection pushdown to read fewer columns
Next Steps¶
- Query Optimization - Reduce data read
- S3 Access - Stream from cloud storage
- Error Handling - Handle failures gracefully