API Reference¶
HandlerPool¶
HandlerPool
¶
A pool for managing protobuf message handlers.
The HandlerPool caches message handlers for efficient conversion between protobuf messages and Arrow RecordBatches. It automatically resolves protobuf file descriptor dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_descriptors
|
list[FileDescriptor]
|
A list of protobuf FileDescriptor objects. All dependencies will be automatically resolved. |
required |
config
|
PtarsConfig | None
|
Optional configuration for Arrow type mappings. Applies to all handlers created from this pool. |
None
|
Example
from ptars import HandlerPool, PtarsConfig
pool = HandlerPool([YourMessage.DESCRIPTOR.file])
handler = pool.get_for_message(YourMessage.DESCRIPTOR)
record_batch = handler.list_to_record_batch(payloads)
# With custom config
config = PtarsConfig(list_value_name="element")
pool = HandlerPool([YourMessage.DESCRIPTOR.file], config=config)
Source code in ptars/internal.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | |
__init__(file_descriptors, config=None)
¶
Source code in ptars/internal.py
get_for_message(descriptor)
¶
Get a message handler for the given protobuf descriptor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
descriptor
|
Descriptor
|
A protobuf message Descriptor. |
required |
Returns:
| Type | Description |
|---|---|
MessageHandler
|
A MessageHandler that can convert messages of this type |
MessageHandler
|
to/from Arrow format. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If descriptor is not a valid protobuf Descriptor. |
Source code in ptars/internal.py
messages_to_record_batch(messages, descriptor)
¶
Convert a list of protobuf messages to an Arrow RecordBatch.
This is a convenience method that handles serialization internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[Message]
|
A list of protobuf message instances. |
required |
descriptor
|
Descriptor
|
The protobuf Descriptor for the message type. |
required |
Returns:
| Type | Description |
|---|---|
RecordBatch
|
A pyarrow.RecordBatch with one column per protobuf field. |
Example
Source code in ptars/internal.py
record_batch_to_messages(record_batch, descriptor)
¶
Convert an Arrow RecordBatch back to protobuf messages.
This is a convenience method that handles deserialization internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record_batch
|
RecordBatch
|
A pyarrow.RecordBatch with the same schema as produced by messages_to_record_batch. |
required |
descriptor
|
Descriptor
|
The protobuf Descriptor for the message type. |
required |
Returns:
| Type | Description |
|---|---|
list[Message]
|
A list of protobuf message instances. |
Example
Source code in ptars/internal.py
read_size_delimited_file(path, descriptor)
¶
Read size-delimited protobuf messages from a file to a RecordBatch.
Each message in the file should be preceded by its size encoded as a varint. This method uses a fast Rust implementation for reading and parsing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | PathLike
|
Path to the file containing size-delimited messages. |
required |
descriptor
|
Descriptor
|
The protobuf Descriptor for the message type. |
required |
Returns:
| Type | Description |
|---|---|
RecordBatch
|
A pyarrow.RecordBatch with one column per protobuf field. |
Example
Source code in ptars/internal.py
MessageHandler¶
The MessageHandler class is returned by HandlerPool.get_for_message()
and provides low-level conversion methods.
Methods¶
list_to_record_batch(payloads: list[bytes]) -> pyarrow.RecordBatch¶
Convert a list of serialized protobuf messages to an Arrow RecordBatch.
Parameters:
payloads: A list of bytes, where each element is a serialized protobuf message.
Returns:
- A
pyarrow.RecordBatchwith one column per field in the protobuf message.
Example:
handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
payloads = [msg.SerializeToString() for msg in messages]
record_batch = handler.list_to_record_batch(payloads)
record_batch_to_array(record_batch: pyarrow.RecordBatch) -> pyarrow.BinaryArray¶
Convert an Arrow RecordBatch back to serialized protobuf messages.
Parameters:
record_batch: Apyarrow.RecordBatchwith the same schema as produced bylist_to_record_batch.
Returns:
- A
pyarrow.BinaryArraywhere each element is a serialized protobuf message.
Example:
handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
binary_array = handler.record_batch_to_array(record_batch)
messages = [SearchRequest.FromString(s.as_py()) for s in binary_array]
array_to_record_batch(array: pyarrow.BinaryArray) -> pyarrow.RecordBatch¶
Convert a binary array of serialized protobuf messages to a RecordBatch.
Parameters:
array: Apyarrow.BinaryArraywhere each element is a serialized protobuf message.
Returns:
- A
pyarrow.RecordBatchwith one column per field in the protobuf message.
Example:
import pyarrow as pa
handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
binary_array = pa.array([msg.SerializeToString() for msg in messages], type=pa.binary())
record_batch = handler.array_to_record_batch(binary_array)
PtarsConfig¶
PtarsConfig
dataclass
¶
Configuration for protobuf to Arrow conversions.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp_tz |
str | None
|
Timezone for timestamp values. Use None for timezone-naive. |
timestamp_unit |
TimeUnitLiteral
|
Time unit for timestamps ("s", "ms", "us", "ns"). |
time_unit |
TimeUnitLiteral
|
Time unit for time of day ("s", "ms", "us", "ns"). |
duration_unit |
TimeUnitLiteral
|
Time unit for durations ("s", "ms", "us", "ns"). |
list_value_name |
str
|
Field name for list items in Arrow schema. |
list_nullable |
bool
|
Whether list fields can be null. |
map_nullable |
bool
|
Whether map fields can be null. |
list_value_nullable |
bool
|
Whether list elements can be null. |
map_value_nullable |
bool
|
Whether map values can be null. |
use_large_string |
bool
|
Whether to use LargeUtf8 instead of Utf8 for strings. |
use_large_binary |
bool
|
Whether to use LargeBinary instead of Binary for bytes. |
use_large_list |
bool
|
Whether to use LargeList instead of List for repeated fields. |
enum_repr |
EnumReprLiteral
|
How to represent enum fields ("int32", "string", "binary"). |
Note
The Rust API also supports map_value_name for customizing the field name
of map values in the Arrow schema. This option is not exposed in Python
because Arrow's MapType uses a fixed "value" field name when constructed
via PyArrow's C data interface. The default "value" name is always used
for map values in Python.
Source code in ptars/config.py
timestamp_tz = 'UTC'
class-attribute
instance-attribute
¶
timestamp_unit = 'ns'
class-attribute
instance-attribute
¶
time_unit = 'ns'
class-attribute
instance-attribute
¶
duration_unit = 'ns'
class-attribute
instance-attribute
¶
list_value_name = 'item'
class-attribute
instance-attribute
¶
list_nullable = False
class-attribute
instance-attribute
¶
map_nullable = False
class-attribute
instance-attribute
¶
list_value_nullable = False
class-attribute
instance-attribute
¶
map_value_nullable = False
class-attribute
instance-attribute
¶
use_large_string = False
class-attribute
instance-attribute
¶
use_large_binary = False
class-attribute
instance-attribute
¶
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
timestamp_tz |
str \| None |
"UTC" |
Timezone for google.protobuf.Timestamp. Use None for timezone-naive. |
timestamp_unit |
Literal["s", "ms", "us", "ns"] |
"ns" |
Time unit for timestamps. |
time_unit |
Literal["s", "ms", "us", "ns"] |
"ns" |
Time unit for google.type.TimeOfDay. |
duration_unit |
Literal["s", "ms", "us", "ns"] |
"ns" |
Time unit for google.protobuf.Duration. |
list_value_name |
str |
"item" |
Field name for list items in Arrow schema. |
list_nullable |
bool |
False |
Whether list fields can be null. |
map_nullable |
bool |
False |
Whether map fields can be null. |
list_value_nullable |
bool |
False |
Whether list elements can be null. |
map_value_nullable |
bool |
False |
Whether map values can be null. |
use_large_string |
bool |
False |
Use large_utf8 instead of utf8 for string fields. |
use_large_binary |
bool |
False |
Use large_binary instead of binary for bytes fields. |
confluent_wire_policy |
Literal["raw", "standard", "protobuf"] |
"raw" |
Confluent Schema Registry wire format stripping policy. |
Map Value Field Name
The Rust API supports map_value_name for customizing the field name of map values
in the Arrow schema. This option is not available in Python because Arrow's MapType
uses a fixed "value" field name when constructed via PyArrow's C data interface.
Precision Loss with Coarser Time Units
When converting timestamps, time of day, or duration values to coarser time units
(e.g., "s" instead of "ns"), sub-unit precision is truncated (not rounded).
For example:
- A timestamp at
1.999seconds withtimestamp_unit="s"becomes1second - A time of day
01:02:03.500withtime_unit="s"becomes01:02:03
Choose the appropriate unit based on your precision requirements.
Confluent Schema Registry Wire Format¶
The confluent_wire_policy option controls how ptars handles the prefix that the
Confluent Schema Registry
adds to Kafka messages. The prefix is stripped before the protobuf payload is decoded.
| Policy | Prefix stripped | Use with |
|---|---|---|
"raw" |
None — bytes are raw protobuf wire format | Direct protobuf serialization |
"standard" |
5 bytes: 1 magic byte (0x00) + 4-byte schema ID |
Avro, JSON Schema |
"protobuf" |
5 bytes + varint-encoded message index array | Protobuf |
The Protobuf wire format includes additional bytes after the 5-byte header: a
varint-encoded count of message indexes followed by the indexes themselves. This
identifies which message type in the .proto file schema is being used. The
"protobuf" policy parses past these varints automatically.
Example¶
from ptars import HandlerPool, PtarsConfig
# Use microsecond precision for timestamps
config = PtarsConfig(
timestamp_unit="us",
timestamp_tz="America/New_York",
list_value_name="element",
)
pool = HandlerPool([MyMessage.DESCRIPTOR.file], config=config)
read_size_delimited_file(path: str) -> pyarrow.RecordBatch¶
Read size-delimited protobuf messages from a file and convert to a RecordBatch.
Size-delimited format means each message is preceded by its size encoded as a varint. This is a common format for storing multiple protobuf messages in a single file.
Parameters:
path: Path to the file containing size-delimited protobuf messages.
Returns:
- A
pyarrow.RecordBatchwith one column per field in the protobuf message.
Example:
handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
record_batch = handler.read_size_delimited_file("messages.bin")
Type Mappings¶
ptars converts protobuf types to Arrow types as follows:
Scalar Types¶
| Protobuf Type | Arrow Type |
|---|---|
double |
float64 |
float |
float32 |
int32 |
int32 |
int64 |
int64 |
uint32 |
uint32 |
uint64 |
uint64 |
sint32 |
int32 |
sint64 |
int64 |
fixed32 |
uint32 |
fixed64 |
uint64 |
sfixed32 |
int32 |
sfixed64 |
int64 |
bool |
bool |
string |
utf8 or large_utf8 |
bytes |
binary or large_binary |
enum |
int32 |
Large String/Binary Types
By default, string fields map to utf8 and bytes fields map to binary.
Set use_large_string=True or use_large_binary=True in PtarsConfig to use
large_utf8 or large_binary instead. Large types support offsets >2GB.
Composite Types¶
| Protobuf Type | Arrow Type |
|---|---|
message |
struct |
repeated T |
list<T> |
map<K, V> |
map<K, V> |
Well-Known Types¶
| Protobuf Type | Arrow Type | Notes |
|---|---|---|
google.protobuf.Timestamp |
timestamp[unit, tz] |
Unit and timezone configurable via PtarsConfig |
google.type.Date |
date32 |
|
google.type.TimeOfDay |
time32[s], time32[ms], time64[us], or time64[ns] |
Unit configurable via PtarsConfig (see below) |
TimeOfDay type mapping by unit:
time_unit |
Arrow Type |
|---|---|
"s" |
time32[s] |
"ms" |
time32[ms] |
"us" |
time64[us] |
"ns" |
time64[ns] |
Wrapper Types¶
Wrapper types are converted to their corresponding Arrow types with nullability. These are useful for representing nullable scalars in proto3.
| Protobuf Type | Arrow Type |
|---|---|
google.protobuf.DoubleValue |
float64 (nullable) |
google.protobuf.FloatValue |
float32 (nullable) |
google.protobuf.Int64Value |
int64 (nullable) |
google.protobuf.UInt64Value |
uint64 (nullable) |
google.protobuf.Int32Value |
int32 (nullable) |
google.protobuf.UInt32Value |
uint32 (nullable) |
google.protobuf.BoolValue |
bool (nullable) |
google.protobuf.StringValue |
utf8 or large_utf8 (nullable) |
google.protobuf.BytesValue |
binary or large_binary (nullable) |