Row Factories🔗
Introduction🔗
Row factories are a feature in pinot_connect
that allow users to customize the format of query results. By default, query
results are returned as tuples. They are heavily inspired by a feature with the same name in psycopg. pinot_connect
ships with a handful of userful row factories, but it is also easy to define your own. Row factories are typed with
generics and typevars to ensure static type checkers (such as mypy) will correctly check the return types.
Warning
Because Apache Pinot's query protocol is http/https and the data is returned via json, data is serialized from the
server as a list of lists. Using a list_row
factory gives the best performance, because it does not modify the
data. The default, though, is tuple_row
to maintain DB-API compliance. The performance penalty is small, but it
can add up on large result sets.
How Row Factories Work🔗
Internally, row factories work by processing each row of the query result set and transforming it into the desired
format before being returned to the user. This transformation is achieved through user-defined functions or callable
objects that serve as the "factory" for producing rows. These transformations are done lazily and guarantees each row
is only ever processed exactly once.
Some common use cases include:
- Returning rows as dictionaries
- Returning rows as a user defined object, such as a dataclass or pydantic model
- Passing the row as kwargs or args to a function
- Loading json strings returned from pinot into python objects (since pinot always returns them as strings over the api)
This functionality ensures that users have full control over the structure and format of their query results.
Examples🔗
Built in row factories🔗
All examples assume the below code has been ran first
import dataclasses
from typing import Self
import pinot_connect
from pinot_connect import rows
connection = pinot_connect.connect(host="localhost")
with conn.cursor() as cursor: # do not have to pass, tuple_row is default
cursor.execute("select * from airlineStats limit 10")
print(type(cursor.fetchone()) # tuple
@dataclass.dataclass
class AirTime:
AirTime: int
AirlineID: int
with conn.cursor(row_factory=rows.kwargs_row(AirTime)) as cursor:
cursor.execute("select AirTime, AirLineID from airlineStats limit 10")
print(type(cursor.fetchone)) # AirTime
@dataclass.dataclass
class AirTime:
air_time: int
air_line_id: int
@classmethod
def from_row(cls, **row) -> Self:
return cls(air_time=row['AirTime'], air_line_id=row['AirlineID'])
with conn.cursor(row_factory=rows.kwargs_row(AirTime.from_row)):
cursor.execute("select AirTime, AirLineID from airlineStats limit 10")
print(type(cursor.fetchone)) # AirTime
class AirTime:
def __init__(air_time: int, air_line_id: int):
self.air_time = air_time
self.air_line_id = air_line_id
# rows.kwargs_row -> RowFactory[AirTime]
with conn.cursor(row_factory=rows.args_row(AirTime)) as cursor:
cursor.execute("select * from airlineStats limit 10")
print(type(cursor.fetchone)) # AirTime
Info
Over the API Pinot returns JSON columns as strings, and also indicates the column type as a string. Often time, you will want to serialize the json columns to python objects. This row factory handles doing that efficiently and returning the fields as values in a dict row; in other row factories, the value will always be a string.
Writing your own row factories🔗
It is simple to write your own row factories - when doing so, it is important to use the types defined in pinot_connect.rows
to ensure correct static typing.
RowType
when the type is determined by something passed to an outer function, such aspinot_connect.rows.kwargs_row
.
When doing this, you should also make sure the outer function returnsRowFactory[RowType]
RowMaker
should be returned from everyRowFactory
function.
Let's look at how you might write your own factory that always validates a row with pydantic. This example assume that you have pydantic already installed.
import pydantic
from pinot_connect.rows import RowMaker
from pinot_connect import Column
from typing import Iterator
class AirlineStats(pydantic.BaseModel):
air_time: int = pydantic.Field(..., alias="AirTime")
air_line_id: int = pydantic.Field(..., alias="AirlineID")
def airline_stats_row(description: list[Column]) -> RowMaker[AirlineStats]:
# important to do this at top level of the function so we only have to build column names once per query
column_names = [i[0] for i in description]
def airline_stats_row_(values: Iterator) -> AirlineStats:
data = dict(zip(column_names, values))
return AirlineStats(**data)
return airline_stats_row_
with connection.cursor(row_factory=airline_stats_row) as cursor:
cursor.execute("select * from airlineStats limit 10")
print(type(cursor.fetchone())) # AirlineStats