API Reference¶
Complete API documentation for Jetliner.
Functions¶
scan_avro¶
def scan_avro(
source: str | Path | Sequence[str] | Sequence[Path],
*,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
glob: bool = True,
include_file_paths: str | None = None,
ignore_errors: bool = False,
storage_options: dict[str, str] | None = None,
buffer_blocks: int = 4,
buffer_bytes: int = 64 * 1024 * 1024,
read_chunk_size: int | None = None,
batch_size: int = 100_000,
) -> pl.LazyFrame
Scan Avro file(s), returning a LazyFrame with query optimization support.
This function uses Polars' IO plugin system to enable query optimizations:
- Projection pushdown: Only read columns that are actually used in the query
- Predicate pushdown: Apply filters during reading, not after
- Early stopping: Stop reading after the requested number of rows
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str |
required | Path to Avro file(s). Supports local paths, S3 URIs, glob patterns, or lists |
n_rows |
int |
None |
Maximum number of rows to read |
row_index_name |
str |
None |
Name for row index column (inserted as first column) |
row_index_offset |
int |
0 |
Starting value for row index |
glob |
bool |
True |
Whether to expand glob patterns in paths |
include_file_paths |
str |
None |
Column name for source file paths |
ignore_errors |
bool |
False |
If True, skip bad records. If False, fail on first error |
storage_options |
dict |
None |
Configuration for S3 connections |
buffer_blocks |
int |
4 |
Number of blocks to prefetch for better I/O performance |
buffer_bytes |
int |
64MB |
Maximum bytes to buffer during prefetching |
read_chunk_size |
int |
None |
I/O read chunk size in bytes (auto-detect if None) |
batch_size |
int |
100_000 |
Target number of rows per DataFrame batch |
Returns:
pl.LazyFrame - A LazyFrame that can be used with Polars query operations.
Raises:
FileNotFoundError- If the file does not existPermissionError- If access is deniedParseError- If the file is not a valid Avro fileSchemaError- If the schema is invalid or cannot be convertedSourceError- For S3 or filesystem errors
Examples:
import jetliner
import polars as pl
# Basic scan
lf = jetliner.scan_avro("data.avro")
df = lf.collect()
# S3 with credentials
df = jetliner.scan_avro(
"s3://bucket/file.avro",
storage_options={
"endpoint": "http://localhost:9000",
"aws_access_key_id": "minioadmin",
"aws_secret_access_key": "minioadmin",
}
).collect()
# Query with optimization
result = (
jetliner.scan_avro("file.avro")
.select(["col1", "col2"])
.filter(pl.col("amount") > 100)
.head(1000)
.collect()
)
# Multiple files with glob pattern
df = jetliner.scan_avro("data/*.avro").collect()
# With row index
df = jetliner.scan_avro("file.avro", row_index_name="idx").collect()
# With file path tracking
df = jetliner.scan_avro("data/*.avro", include_file_paths="source_file").collect()
read_avro¶
def read_avro(
source: str | Path | Sequence[str] | Sequence[Path],
*,
columns: Sequence[str] | Sequence[int] | None = None,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
glob: bool = True,
include_file_paths: str | None = None,
ignore_errors: bool = False,
storage_options: dict[str, str] | None = None,
buffer_blocks: int = 4,
buffer_bytes: int = 64 * 1024 * 1024,
read_chunk_size: int | None = None,
batch_size: int = 100_000,
) -> pl.DataFrame
Read Avro file(s) into a DataFrame with optional column selection.
This function is equivalent to scan_avro(...).collect() with eager column selection.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str |
required | Path to Avro file(s). Supports local paths, S3 URIs, glob patterns, or lists |
columns |
list |
None |
Columns to read (by name or index). None reads all columns |
n_rows |
int |
None |
Maximum number of rows to read |
row_index_name |
str |
None |
Name for row index column (inserted as first column) |
row_index_offset |
int |
0 |
Starting value for row index |
glob |
bool |
True |
Whether to expand glob patterns in paths |
include_file_paths |
str |
None |
Column name for source file paths |
ignore_errors |
bool |
False |
If True, skip bad records. If False, fail on first error |
storage_options |
dict |
None |
Configuration for S3 connections |
buffer_blocks |
int |
4 |
Number of blocks to prefetch |
buffer_bytes |
int |
64MB |
Maximum bytes to buffer |
read_chunk_size |
int |
None |
I/O read chunk size in bytes (auto-detect if None) |
batch_size |
int |
100_000 |
Target number of rows per batch |
Returns:
pl.DataFrame - A DataFrame containing the Avro data.
Examples:
import jetliner
# Read entire file
df = jetliner.read_avro("data.avro")
# Read specific columns by name
df = jetliner.read_avro("data.avro", columns=["user_id", "amount"])
# Read specific columns by index
df = jetliner.read_avro("data.avro", columns=[0, 2, 5])
# Limit rows
df = jetliner.read_avro("data.avro", n_rows=1000)
# Multiple files
df = jetliner.read_avro(["file1.avro", "file2.avro"])
# Glob pattern
df = jetliner.read_avro("data/*.avro")
open¶
def open(
path: str,
*,
batch_size: int = 100_000,
buffer_blocks: int = 4,
buffer_bytes: int = 64 * 1024 * 1024,
strict: bool = False,
storage_options: dict[str, str] | None = None,
) -> AvroReader
Open an Avro file for streaming iteration.
Returns a context manager that yields DataFrame batches. Use this API when you need fine-grained control over batch processing, progress tracking, or memory management.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str |
required | Path to Avro file. Supports local paths and S3 URIs |
batch_size |
int |
100_000 |
Maximum records per batch |
buffer_blocks |
int |
4 |
Number of blocks to prefetch |
buffer_bytes |
int |
64MB |
Maximum bytes to buffer |
strict |
bool |
False |
If True, fail on first error |
storage_options |
dict |
None |
Configuration for S3 connections |
Returns:
AvroReader - A context manager that iterates over DataFrame batches.
Examples:
import jetliner
# Basic iteration
with jetliner.open("data.avro") as reader:
for batch in reader:
print(f"Batch: {batch.height} rows")
# Access schema
with jetliner.open("data.avro") as reader:
print(reader.schema)
print(reader.schema_dict)
# Check errors in skip mode
with jetliner.open("data.avro", strict=False) as reader:
batches = list(reader)
if reader.error_count > 0:
print(f"Skipped {reader.error_count} records")
read_avro_schema¶
def read_avro_schema(
source: str | Path | Sequence[str] | Sequence[Path],
*,
storage_options: dict[str, str] | None = None,
) -> dict[str, pl.DataType]
Read an Avro file's schema and return the equivalent Polars schema.
Only reads the file header, making it fast even for large files.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str |
required | Path to Avro file |
storage_options |
dict |
None |
Configuration for S3 connections |
Returns:
dict[str, pl.DataType] - Dictionary mapping column names to Polars data types.
Examples:
import jetliner
schema = jetliner.read_avro_schema("data.avro")
print(schema)
# {'user_id': Int64, 'name': String, 'amount': Float64}
# From S3
schema = jetliner.read_avro_schema(
"s3://bucket/data.avro",
storage_options={"region": "us-east-1"}
)
parse_avro_schema (deprecated)¶
def parse_avro_schema(
path: str,
*,
storage_options: dict[str, str] | None = None,
) -> dict[str, pl.DataType]
Deprecated
Use read_avro_schema() instead. This function will be removed in a future version.
Classes¶
AvroReader¶
Context manager returned by open(). Provides iteration over DataFrame batches and schema access.
Properties:
| Property | Type | Description |
|---|---|---|
schema |
str |
Avro schema as JSON string |
schema_dict |
dict |
Avro schema as Python dictionary |
error_count |
int |
Number of records skipped (skip mode only) |
errors |
list[str] |
List of error messages (skip mode only) |
Usage:
import jetliner
with jetliner.open("data.avro") as reader:
# Access schema before reading
print(f"Schema: {reader.schema}")
# Iterate over batches
for batch in reader:
process(batch)
# Check for errors (skip mode)
if reader.error_count > 0:
for error in reader.errors:
print(f"Error: {error}")
AvroReaderCore¶
Low-level reader class used internally by scan_avro() and open(). Most users should use those functions instead.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str |
required | Path to Avro file |
batch_size |
int |
100_000 |
Records per batch |
buffer_blocks |
int |
4 |
Blocks to prefetch |
buffer_bytes |
int |
64MB |
Max buffer size |
strict |
bool |
False |
Fail on first error |
projected_columns |
list[str] |
None |
Columns to read (projection) |
storage_options |
dict |
None |
S3 configuration |
Exceptions¶
All Jetliner exceptions inherit from JetlinerError:
import jetliner
try:
df = jetliner.scan_avro("data.avro").collect()
except jetliner.JetlinerError as e:
print(f"Jetliner error: {e}")
Exception Hierarchy¶
JetlinerError (base class)
├── ParseError # Invalid Avro file format
├── SchemaError # Invalid or unsupported schema
├── CodecError # Decompression failure
├── DecodeError # Record decoding failure
└── SourceError # File/S3 access errors
Structured Exception Types¶
For programmatic error handling, Jetliner provides structured exception types with metadata attributes:
PyDecodeError¶
Raised when a record cannot be decoded.
Attributes:
| Attribute | Type | Description |
|---|---|---|
block_index |
int |
Index of the block containing error |
record_index |
int |
Index of the record within block |
offset |
int |
Byte offset in the file |
message |
str |
Error description |
try:
df = jetliner.read_avro("corrupted.avro")
except jetliner.PyDecodeError as e:
print(f"Error at block {e.block_index}, record {e.record_index}")
print(f"Offset: {e.offset}")
print(f"Message: {e.message}")
PyParseError¶
Raised when the file format is invalid.
Attributes:
| Attribute | Type | Description |
|---|---|---|
offset |
int |
Byte offset of error |
message |
str |
Error description |
PySourceError¶
Raised for file or S3 access errors.
Attributes:
| Attribute | Type | Description |
|---|---|---|
path |
str |
Path that caused error |
message |
str |
Error description |
PySchemaError¶
Raised when the schema is invalid.
Attributes:
| Attribute | Type | Description |
|---|---|---|
message |
str |
Error description |
PyCodecError¶
Raised when decompression fails.
Attributes:
| Attribute | Type | Description |
|---|---|---|
message |
str |
Error description |
JetlinerError¶
Base exception class for all Jetliner errors. Catch this to handle any library-specific error.
ParseError¶
Raised when the file is not a valid Avro file.
Common causes:
- Invalid magic bytes (file is not Avro format)
- Corrupted file header
- Truncated file
SchemaError¶
Raised when the Avro schema is invalid or cannot be converted to Polars types.
Common causes:
- Malformed schema JSON
- Unsupported schema features
- Invalid type definitions
CodecError¶
Raised when decompression fails.
Common causes:
- Corrupted compressed data
- Unsupported codec
- Invalid compression block
DecodeError¶
Raised when a record cannot be decoded. In strict mode, this stops reading. In skip mode, the record is skipped.
Common causes:
- Corrupted record data
- Schema mismatch
- Invalid field encoding
SourceError¶
Raised for file or S3 access errors.
Common causes:
- File not found
- Permission denied
- S3 authentication failure
- Network errors
Storage Options¶
The storage_options parameter configures S3 access:
| Key | Description | Example |
|---|---|---|
endpoint |
Custom S3 endpoint (MinIO, LocalStack, R2) | http://localhost:9000 |
aws_access_key_id |
AWS access key | AKIAIOSFODNN7EXAMPLE |
aws_secret_access_key |
AWS secret key | wJalrXUtnFEMI/... |
region |
AWS region | us-east-1 |
max_retries |
Maximum retry attempts for transient failures | 5 |
Explicit credentials take precedence over environment variables.
Type Mapping¶
Primitive Types¶
| Avro Type | Polars Type |
|---|---|
null |
Null |
boolean |
Boolean |
int |
Int32 |
long |
Int64 |
float |
Float32 |
double |
Float64 |
bytes |
Binary |
string |
String |
Logical Types¶
| Avro Logical Type | Polars Type |
|---|---|
date |
Date |
time-millis |
Time |
time-micros |
Time |
timestamp-millis |
Datetime(ms) |
timestamp-micros |
Datetime(μs) |
uuid |
String |
decimal |
Decimal |
Complex Types¶
| Avro Type | Polars Type |
|---|---|
array |
List |
map |
Struct |
record |
Struct |
enum |
Categorical |
fixed |
Binary |
Union Types¶
["null", T]→T(nullable)["null", T1, T2, ...]→Structwith type indicator[T1, T2, ...](no null) →Structwith type indicator
Recursive Types¶
Recursive types are serialized to JSON strings since Polars doesn't support recursive structures.