Skip to content

API Reference

HandlerPool

HandlerPool

A pool for managing protobuf message handlers.

The HandlerPool caches message handlers for efficient conversion between protobuf messages and Arrow RecordBatches. It automatically resolves protobuf file descriptor dependencies.

Parameters:

Name Type Description Default
file_descriptors list[FileDescriptor]

A list of protobuf FileDescriptor objects. All dependencies will be automatically resolved.

required
config PtarsConfig | None

Optional configuration for Arrow type mappings. Applies to all handlers created from this pool.

None
Example
from ptars import HandlerPool, PtarsConfig

pool = HandlerPool([YourMessage.DESCRIPTOR.file])
handler = pool.get_for_message(YourMessage.DESCRIPTOR)
record_batch = handler.list_to_record_batch(payloads)

# With custom config
config = PtarsConfig(list_value_name="element")
pool = HandlerPool([YourMessage.DESCRIPTOR.file], config=config)
Source code in ptars/internal.py
class HandlerPool:
    """A pool for managing protobuf message handlers.

    The HandlerPool caches message handlers for efficient conversion between
    protobuf messages and Arrow RecordBatches. It automatically resolves
    protobuf file descriptor dependencies.

    Args:
        file_descriptors: A list of protobuf FileDescriptor objects.
            All dependencies will be automatically resolved.
        config: Optional configuration for Arrow type mappings.
            Applies to all handlers created from this pool.

    Example:
        ```python
        from ptars import HandlerPool, PtarsConfig

        pool = HandlerPool([YourMessage.DESCRIPTOR.file])
        handler = pool.get_for_message(YourMessage.DESCRIPTOR)
        record_batch = handler.list_to_record_batch(payloads)

        # With custom config
        config = PtarsConfig(list_value_name="element")
        pool = HandlerPool([YourMessage.DESCRIPTOR.file], config=config)
        ```
    """

    def __init__(
        self,
        file_descriptors: list[FileDescriptor],
        config: PtarsConfig | None = None,
    ):
        all_descriptors = []
        for file_descriptor in file_descriptors:
            if not isinstance(file_descriptor, FileDescriptor):
                raise TypeError(f"Expecting {FileDescriptor.__name__}")

            if file_descriptor not in all_descriptors:
                new_descriptors = _get_dependencies(file_descriptor)
                for new_descriptor in new_descriptors:
                    if new_descriptor not in all_descriptors:
                        all_descriptors.append(new_descriptor)

        payloads = [_file_descriptor_to_bytes(d) for d in all_descriptors]

        self._proto_registry = ProtoRegistry(payloads)
        self._config = config
        self._pool: dict[str, MessageHandler] = {}

    def get_for_message(self, descriptor: Descriptor) -> MessageHandler:
        """Get a message handler for the given protobuf descriptor.

        Args:
            descriptor: A protobuf message Descriptor.

        Returns:
            A MessageHandler that can convert messages of this type
            to/from Arrow format.

        Raises:
            TypeError: If descriptor is not a valid protobuf Descriptor.
        """
        if isinstance(descriptor, google._upb._message.MessageMeta):
            warnings.warn(
                f"Received {google._upb._message.MessageMeta.__class__.__name__}"
                f" instead of {Descriptor.__class__.__name__}"
            )
            descriptor = descriptor.DESCRIPTOR
        if not isinstance(
            descriptor,
            (google.protobuf.descriptor.Descriptor, google._upb._message.Descriptor),
        ):
            raise TypeError("Expecting Descriptor")

        try:
            return self._pool[descriptor.full_name]
        except KeyError:
            result = self._proto_registry.create_for_message(
                descriptor.full_name, self._config
            )
            self._pool[descriptor.full_name] = result
            return result

    def messages_to_record_batch(
        self, messages: list[Message], descriptor: Descriptor
    ) -> pa.RecordBatch:
        """Convert a list of protobuf messages to an Arrow RecordBatch.

        This is a convenience method that handles serialization internally.

        Args:
            messages: A list of protobuf message instances.
            descriptor: The protobuf Descriptor for the message type.

        Returns:
            A pyarrow.RecordBatch with one column per protobuf field.

        Example:
            ```python
            pool = HandlerPool([MyMessage.DESCRIPTOR.file])
            messages = [MyMessage(field="value")]
            record_batch = pool.messages_to_record_batch(
                messages, MyMessage.DESCRIPTOR
            )
            ```
        """
        handler = self.get_for_message(descriptor)
        return handler.list_to_record_batch([m.SerializeToString() for m in messages])

    def record_batch_to_messages(
        self, record_batch: pa.RecordBatch, descriptor: Descriptor
    ) -> list[Message]:
        """Convert an Arrow RecordBatch back to protobuf messages.

        This is a convenience method that handles deserialization internally.

        Args:
            record_batch: A pyarrow.RecordBatch with the same schema
                as produced by messages_to_record_batch.
            descriptor: The protobuf Descriptor for the message type.

        Returns:
            A list of protobuf message instances.

        Example:
            ```python
            pool = HandlerPool([MyMessage.DESCRIPTOR.file])
            messages = pool.record_batch_to_messages(
                record_batch, MyMessage.DESCRIPTOR
            )
            ```
        """
        handler = self.get_for_message(descriptor)
        array = handler.record_batch_to_array(record_batch)
        return [descriptor._concrete_class.FromString(s.as_py()) for s in array]  # type: ignore[unresolved-attribute]

    def read_size_delimited_file(
        self, path: str | os.PathLike, descriptor: Descriptor
    ) -> pa.RecordBatch:
        """Read size-delimited protobuf messages from a file to a RecordBatch.

        Each message in the file should be preceded by its size encoded as a varint.
        This method uses a fast Rust implementation for reading and parsing.

        Args:
            path: Path to the file containing size-delimited messages.
            descriptor: The protobuf Descriptor for the message type.

        Returns:
            A pyarrow.RecordBatch with one column per protobuf field.

        Example:
            ```python
            pool = HandlerPool([MyMessage.DESCRIPTOR.file])
            record_batch = pool.read_size_delimited_file(
                "messages.bin", MyMessage.DESCRIPTOR
            )
            ```
        """
        handler = self.get_for_message(descriptor)
        return handler.read_size_delimited_file(str(path))

__init__(file_descriptors, config=None)

Source code in ptars/internal.py
def __init__(
    self,
    file_descriptors: list[FileDescriptor],
    config: PtarsConfig | None = None,
):
    all_descriptors = []
    for file_descriptor in file_descriptors:
        if not isinstance(file_descriptor, FileDescriptor):
            raise TypeError(f"Expecting {FileDescriptor.__name__}")

        if file_descriptor not in all_descriptors:
            new_descriptors = _get_dependencies(file_descriptor)
            for new_descriptor in new_descriptors:
                if new_descriptor not in all_descriptors:
                    all_descriptors.append(new_descriptor)

    payloads = [_file_descriptor_to_bytes(d) for d in all_descriptors]

    self._proto_registry = ProtoRegistry(payloads)
    self._config = config
    self._pool: dict[str, MessageHandler] = {}

get_for_message(descriptor)

Get a message handler for the given protobuf descriptor.

Parameters:

Name Type Description Default
descriptor Descriptor

A protobuf message Descriptor.

required

Returns:

Type Description
MessageHandler

A MessageHandler that can convert messages of this type

MessageHandler

to/from Arrow format.

Raises:

Type Description
TypeError

If descriptor is not a valid protobuf Descriptor.

Source code in ptars/internal.py
def get_for_message(self, descriptor: Descriptor) -> MessageHandler:
    """Get a message handler for the given protobuf descriptor.

    Args:
        descriptor: A protobuf message Descriptor.

    Returns:
        A MessageHandler that can convert messages of this type
        to/from Arrow format.

    Raises:
        TypeError: If descriptor is not a valid protobuf Descriptor.
    """
    if isinstance(descriptor, google._upb._message.MessageMeta):
        warnings.warn(
            f"Received {google._upb._message.MessageMeta.__class__.__name__}"
            f" instead of {Descriptor.__class__.__name__}"
        )
        descriptor = descriptor.DESCRIPTOR
    if not isinstance(
        descriptor,
        (google.protobuf.descriptor.Descriptor, google._upb._message.Descriptor),
    ):
        raise TypeError("Expecting Descriptor")

    try:
        return self._pool[descriptor.full_name]
    except KeyError:
        result = self._proto_registry.create_for_message(
            descriptor.full_name, self._config
        )
        self._pool[descriptor.full_name] = result
        return result

messages_to_record_batch(messages, descriptor)

Convert a list of protobuf messages to an Arrow RecordBatch.

This is a convenience method that handles serialization internally.

Parameters:

Name Type Description Default
messages list[Message]

A list of protobuf message instances.

required
descriptor Descriptor

The protobuf Descriptor for the message type.

required

Returns:

Type Description
RecordBatch

A pyarrow.RecordBatch with one column per protobuf field.

Example
pool = HandlerPool([MyMessage.DESCRIPTOR.file])
messages = [MyMessage(field="value")]
record_batch = pool.messages_to_record_batch(
    messages, MyMessage.DESCRIPTOR
)
Source code in ptars/internal.py
def messages_to_record_batch(
    self, messages: list[Message], descriptor: Descriptor
) -> pa.RecordBatch:
    """Convert a list of protobuf messages to an Arrow RecordBatch.

    This is a convenience method that handles serialization internally.

    Args:
        messages: A list of protobuf message instances.
        descriptor: The protobuf Descriptor for the message type.

    Returns:
        A pyarrow.RecordBatch with one column per protobuf field.

    Example:
        ```python
        pool = HandlerPool([MyMessage.DESCRIPTOR.file])
        messages = [MyMessage(field="value")]
        record_batch = pool.messages_to_record_batch(
            messages, MyMessage.DESCRIPTOR
        )
        ```
    """
    handler = self.get_for_message(descriptor)
    return handler.list_to_record_batch([m.SerializeToString() for m in messages])

record_batch_to_messages(record_batch, descriptor)

Convert an Arrow RecordBatch back to protobuf messages.

This is a convenience method that handles deserialization internally.

Parameters:

Name Type Description Default
record_batch RecordBatch

A pyarrow.RecordBatch with the same schema as produced by messages_to_record_batch.

required
descriptor Descriptor

The protobuf Descriptor for the message type.

required

Returns:

Type Description
list[Message]

A list of protobuf message instances.

Example
pool = HandlerPool([MyMessage.DESCRIPTOR.file])
messages = pool.record_batch_to_messages(
    record_batch, MyMessage.DESCRIPTOR
)
Source code in ptars/internal.py
def record_batch_to_messages(
    self, record_batch: pa.RecordBatch, descriptor: Descriptor
) -> list[Message]:
    """Convert an Arrow RecordBatch back to protobuf messages.

    This is a convenience method that handles deserialization internally.

    Args:
        record_batch: A pyarrow.RecordBatch with the same schema
            as produced by messages_to_record_batch.
        descriptor: The protobuf Descriptor for the message type.

    Returns:
        A list of protobuf message instances.

    Example:
        ```python
        pool = HandlerPool([MyMessage.DESCRIPTOR.file])
        messages = pool.record_batch_to_messages(
            record_batch, MyMessage.DESCRIPTOR
        )
        ```
    """
    handler = self.get_for_message(descriptor)
    array = handler.record_batch_to_array(record_batch)
    return [descriptor._concrete_class.FromString(s.as_py()) for s in array]  # type: ignore[unresolved-attribute]

read_size_delimited_file(path, descriptor)

Read size-delimited protobuf messages from a file to a RecordBatch.

Each message in the file should be preceded by its size encoded as a varint. This method uses a fast Rust implementation for reading and parsing.

Parameters:

Name Type Description Default
path str | PathLike

Path to the file containing size-delimited messages.

required
descriptor Descriptor

The protobuf Descriptor for the message type.

required

Returns:

Type Description
RecordBatch

A pyarrow.RecordBatch with one column per protobuf field.

Example
pool = HandlerPool([MyMessage.DESCRIPTOR.file])
record_batch = pool.read_size_delimited_file(
    "messages.bin", MyMessage.DESCRIPTOR
)
Source code in ptars/internal.py
def read_size_delimited_file(
    self, path: str | os.PathLike, descriptor: Descriptor
) -> pa.RecordBatch:
    """Read size-delimited protobuf messages from a file to a RecordBatch.

    Each message in the file should be preceded by its size encoded as a varint.
    This method uses a fast Rust implementation for reading and parsing.

    Args:
        path: Path to the file containing size-delimited messages.
        descriptor: The protobuf Descriptor for the message type.

    Returns:
        A pyarrow.RecordBatch with one column per protobuf field.

    Example:
        ```python
        pool = HandlerPool([MyMessage.DESCRIPTOR.file])
        record_batch = pool.read_size_delimited_file(
            "messages.bin", MyMessage.DESCRIPTOR
        )
        ```
    """
    handler = self.get_for_message(descriptor)
    return handler.read_size_delimited_file(str(path))

MessageHandler

The MessageHandler class is returned by HandlerPool.get_for_message() and provides low-level conversion methods.

Methods

list_to_record_batch(payloads: list[bytes]) -> pyarrow.RecordBatch

Convert a list of serialized protobuf messages to an Arrow RecordBatch.

Parameters:

  • payloads: A list of bytes, where each element is a serialized protobuf message.

Returns:

  • A pyarrow.RecordBatch with one column per field in the protobuf message.

Example:

handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
payloads = [msg.SerializeToString() for msg in messages]
record_batch = handler.list_to_record_batch(payloads)

record_batch_to_array(record_batch: pyarrow.RecordBatch) -> pyarrow.BinaryArray

Convert an Arrow RecordBatch back to serialized protobuf messages.

Parameters:

  • record_batch: A pyarrow.RecordBatch with the same schema as produced by list_to_record_batch.

Returns:

  • A pyarrow.BinaryArray where each element is a serialized protobuf message.

Example:

handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
binary_array = handler.record_batch_to_array(record_batch)
messages = [SearchRequest.FromString(s.as_py()) for s in binary_array]

array_to_record_batch(array: pyarrow.BinaryArray) -> pyarrow.RecordBatch

Convert a binary array of serialized protobuf messages to a RecordBatch.

Parameters:

  • array: A pyarrow.BinaryArray where each element is a serialized protobuf message.

Returns:

  • A pyarrow.RecordBatch with one column per field in the protobuf message.

Example:

import pyarrow as pa

handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
binary_array = pa.array([msg.SerializeToString() for msg in messages], type=pa.binary())
record_batch = handler.array_to_record_batch(binary_array)

PtarsConfig

PtarsConfig dataclass

Configuration for protobuf to Arrow conversions.

Attributes:

Name Type Description
timestamp_tz str | None

Timezone for timestamp values. Use None for timezone-naive.

timestamp_unit TimeUnitLiteral

Time unit for timestamps ("s", "ms", "us", "ns").

time_unit TimeUnitLiteral

Time unit for time of day ("s", "ms", "us", "ns").

duration_unit TimeUnitLiteral

Time unit for durations ("s", "ms", "us", "ns").

list_value_name str

Field name for list items in Arrow schema.

list_nullable bool

Whether list fields can be null.

map_nullable bool

Whether map fields can be null.

list_value_nullable bool

Whether list elements can be null.

map_value_nullable bool

Whether map values can be null.

use_large_string bool

Whether to use LargeUtf8 instead of Utf8 for strings.

use_large_binary bool

Whether to use LargeBinary instead of Binary for bytes.

use_large_list bool

Whether to use LargeList instead of List for repeated fields.

enum_repr EnumReprLiteral

How to represent enum fields ("int32", "string", "binary").

Note

The Rust API also supports map_value_name for customizing the field name of map values in the Arrow schema. This option is not exposed in Python because Arrow's MapType uses a fixed "value" field name when constructed via PyArrow's C data interface. The default "value" name is always used for map values in Python.

Source code in ptars/config.py
@dataclass(frozen=True)
class PtarsConfig:
    """Configuration for protobuf to Arrow conversions.

    Attributes:
        timestamp_tz: Timezone for timestamp values. Use None for timezone-naive.
        timestamp_unit: Time unit for timestamps ("s", "ms", "us", "ns").
        time_unit: Time unit for time of day ("s", "ms", "us", "ns").
        duration_unit: Time unit for durations ("s", "ms", "us", "ns").
        list_value_name: Field name for list items in Arrow schema.
        list_nullable: Whether list fields can be null.
        map_nullable: Whether map fields can be null.
        list_value_nullable: Whether list elements can be null.
        map_value_nullable: Whether map values can be null.
        use_large_string: Whether to use LargeUtf8 instead of Utf8 for strings.
        use_large_binary: Whether to use LargeBinary instead of Binary for bytes.
        use_large_list: Whether to use LargeList instead of List for repeated fields.
        enum_repr: How to represent enum fields ("int32", "string", "binary").

    Note:
        The Rust API also supports `map_value_name` for customizing the field name
        of map values in the Arrow schema. This option is not exposed in Python
        because Arrow's MapType uses a fixed "value" field name when constructed
        via PyArrow's C data interface. The default "value" name is always used
        for map values in Python.
    """

    timestamp_tz: str | None = "UTC"
    timestamp_unit: TimeUnitLiteral = "ns"
    time_unit: TimeUnitLiteral = "ns"
    duration_unit: TimeUnitLiteral = "ns"
    list_value_name: str = "item"
    list_nullable: bool = False
    map_nullable: bool = False
    list_value_nullable: bool = False
    map_value_nullable: bool = False
    use_large_string: bool = False
    use_large_binary: bool = False
    use_large_list: bool = False
    enum_repr: EnumReprLiteral = "int32"

    def __post_init__(self) -> None:
        """Validate configuration values."""
        if self.timestamp_tz is not None:
            _check_type(self.timestamp_tz, str, "timestamp_tz")
        _check_time_unit(self.timestamp_unit, "timestamp_unit")
        _check_time_unit(self.time_unit, "time_unit")
        _check_time_unit(self.duration_unit, "duration_unit")
        _check_type(self.list_value_name, str, "list_value_name")
        _check_type(self.list_nullable, bool, "list_nullable")
        _check_type(self.map_nullable, bool, "map_nullable")
        _check_type(self.list_value_nullable, bool, "list_value_nullable")
        _check_type(self.map_value_nullable, bool, "map_value_nullable")
        _check_type(self.use_large_string, bool, "use_large_string")
        _check_type(self.use_large_binary, bool, "use_large_binary")
        _check_type(self.use_large_list, bool, "use_large_list")
        _check_type(self.enum_repr, str, "enum_repr")
        if self.enum_repr not in _VALID_ENUM_REPRS:
            raise ValueError(
                f"enum_repr must be one of {_VALID_ENUM_REPRS}, got {self.enum_repr!r}"
            )

timestamp_tz = 'UTC' class-attribute instance-attribute

timestamp_unit = 'ns' class-attribute instance-attribute

time_unit = 'ns' class-attribute instance-attribute

duration_unit = 'ns' class-attribute instance-attribute

list_value_name = 'item' class-attribute instance-attribute

list_nullable = False class-attribute instance-attribute

map_nullable = False class-attribute instance-attribute

list_value_nullable = False class-attribute instance-attribute

map_value_nullable = False class-attribute instance-attribute

use_large_string = False class-attribute instance-attribute

use_large_binary = False class-attribute instance-attribute

Configuration Options

Option Type Default Description
timestamp_tz str \| None "UTC" Timezone for google.protobuf.Timestamp. Use None for timezone-naive.
timestamp_unit Literal["s", "ms", "us", "ns"] "ns" Time unit for timestamps.
time_unit Literal["s", "ms", "us", "ns"] "ns" Time unit for google.type.TimeOfDay.
duration_unit Literal["s", "ms", "us", "ns"] "ns" Time unit for google.protobuf.Duration.
list_value_name str "item" Field name for list items in Arrow schema.
list_nullable bool False Whether list fields can be null.
map_nullable bool False Whether map fields can be null.
list_value_nullable bool False Whether list elements can be null.
map_value_nullable bool False Whether map values can be null.
use_large_string bool False Use large_utf8 instead of utf8 for string fields.
use_large_binary bool False Use large_binary instead of binary for bytes fields.
confluent_wire_policy Literal["raw", "standard", "protobuf"] "raw" Confluent Schema Registry wire format stripping policy.

Map Value Field Name

The Rust API supports map_value_name for customizing the field name of map values in the Arrow schema. This option is not available in Python because Arrow's MapType uses a fixed "value" field name when constructed via PyArrow's C data interface.

Precision Loss with Coarser Time Units

When converting timestamps, time of day, or duration values to coarser time units (e.g., "s" instead of "ns"), sub-unit precision is truncated (not rounded). For example:

  • A timestamp at 1.999 seconds with timestamp_unit="s" becomes 1 second
  • A time of day 01:02:03.500 with time_unit="s" becomes 01:02:03

Choose the appropriate unit based on your precision requirements.

Confluent Schema Registry Wire Format

The confluent_wire_policy option controls how ptars handles the prefix that the Confluent Schema Registry adds to Kafka messages. The prefix is stripped before the protobuf payload is decoded.

Policy Prefix stripped Use with
"raw" None — bytes are raw protobuf wire format Direct protobuf serialization
"standard" 5 bytes: 1 magic byte (0x00) + 4-byte schema ID Avro, JSON Schema
"protobuf" 5 bytes + varint-encoded message index array Protobuf

The Protobuf wire format includes additional bytes after the 5-byte header: a varint-encoded count of message indexes followed by the indexes themselves. This identifies which message type in the .proto file schema is being used. The "protobuf" policy parses past these varints automatically.

Example

from ptars import HandlerPool, PtarsConfig

# Use microsecond precision for timestamps
config = PtarsConfig(
    timestamp_unit="us",
    timestamp_tz="America/New_York",
    list_value_name="element",
)

pool = HandlerPool([MyMessage.DESCRIPTOR.file], config=config)

read_size_delimited_file(path: str) -> pyarrow.RecordBatch

Read size-delimited protobuf messages from a file and convert to a RecordBatch.

Size-delimited format means each message is preceded by its size encoded as a varint. This is a common format for storing multiple protobuf messages in a single file.

Parameters:

  • path: Path to the file containing size-delimited protobuf messages.

Returns:

  • A pyarrow.RecordBatch with one column per field in the protobuf message.

Example:

handler = pool.get_for_message(SearchRequest.DESCRIPTOR)
record_batch = handler.read_size_delimited_file("messages.bin")

Type Mappings

ptars converts protobuf types to Arrow types as follows:

Scalar Types

Protobuf Type Arrow Type
double float64
float float32
int32 int32
int64 int64
uint32 uint32
uint64 uint64
sint32 int32
sint64 int64
fixed32 uint32
fixed64 uint64
sfixed32 int32
sfixed64 int64
bool bool
string utf8 or large_utf8
bytes binary or large_binary
enum int32

Large String/Binary Types

By default, string fields map to utf8 and bytes fields map to binary. Set use_large_string=True or use_large_binary=True in PtarsConfig to use large_utf8 or large_binary instead. Large types support offsets >2GB.

Composite Types

Protobuf Type Arrow Type
message struct
repeated T list<T>
map<K, V> map<K, V>

Well-Known Types

Protobuf Type Arrow Type Notes
google.protobuf.Timestamp timestamp[unit, tz] Unit and timezone configurable via PtarsConfig
google.type.Date date32
google.type.TimeOfDay time32[s], time32[ms], time64[us], or time64[ns] Unit configurable via PtarsConfig (see below)

TimeOfDay type mapping by unit:

time_unit Arrow Type
"s" time32[s]
"ms" time32[ms]
"us" time64[us]
"ns" time64[ns]

Wrapper Types

Wrapper types are converted to their corresponding Arrow types with nullability. These are useful for representing nullable scalars in proto3.

Protobuf Type Arrow Type
google.protobuf.DoubleValue float64 (nullable)
google.protobuf.FloatValue float32 (nullable)
google.protobuf.Int64Value int64 (nullable)
google.protobuf.UInt64Value uint64 (nullable)
google.protobuf.Int32Value int32 (nullable)
google.protobuf.UInt32Value uint32 (nullable)
google.protobuf.BoolValue bool (nullable)
google.protobuf.StringValue utf8 or large_utf8 (nullable)
google.protobuf.BytesValue binary or large_binary (nullable)