Memory-Efficient PyArrow Datasets

After spending months figuring out how to handle datasets in a memory-efficient way, I decided to collect everything in one place. Hopefully, this will save someone else time.


What Is a Dataset?

If you work with Parquet files, you know each file has metadata at the end: how many row groups there are, and min/max values per column per group.

This lets PyArrow quickly scan only the file footer and decide whether to read parts of the file or skip them altogether.

While efficient on local storage, fetching remote files is often so expensive that you’d rather either download everything at once or skip it upfront.

This is where datasets come in: they centralize metadata into a separate file, allowing you to index multiple Parquet files. Put it differently, all the metadata at the end of each file is collected into one file, and that only is needed to guess which files need to be scanned.


Setup

For this example, we set up a simple schema and some data:

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
import pyarrow.dataset as ds

schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("kind", pa.string()),
])

def generate_data(i: int):
    tab = pa.Table.from_pylist([
        {"id": 1 + i * 10, "kind": "a"},
        {"id": 2 + i * 10, "kind": "b"},
        {"id": 3 + i * 10, "kind": "c"},
        {"id": 4 + i * 10, "kind": "d"},
        {"id": 5 + i * 10, "kind": "e"},
    ], schema=schema)
    return tab.to_batches(3)

Creating Parquet Files and Metadata

When working with big datasets, you can’t load entire tables into memory. You need batches. Here’s an example that creates 4 data files and the metadata index:

meta = None

for i in [1, 2, 3, 4]:
    filename = f"000{i}.parquet"
    collector = []

    with pq.ParquetWriter(
        f"/tmp/{filename}", schema, metadata_collector=collector
    ) as w:
        for b in generate_data(i):
            print(f"writing {b.num_rows} rows to {filename}")
            w.write_batch(b)

    collector[0].set_file_path(filename)

    if meta is None:
        meta = collector[0]
    else:
        meta.append_row_groups(collector[0])

pq.write_metadata(schema, "/tmp/meta.parquet", metadata_collector=[meta])

This creates 4 files (each with 2 row groups) and a meta.parquet that indexes them.


Verifying the Metadata

Use the parquet CLI to inspect the index:

parquet meta /tmp/meta.parquet

This shows the row groups and relative paths to the data files.

Inspecting a single data file:

parquet meta /tmp/0001.parquet

confirms it has the expected row groups and row counts.


Efficient Scanning

You can scan meta.parquet directly, and PyArrow will load only the files it needs. Sometimes, you want to know which files are involved before scanning—for parallelism or pagination. Here’s how:

def find(expression):
    d = ds.parquet_dataset("/tmp/meta.parquet")
    out = {}
    for f in d.get_fragments(expression):
        # Split by row group to filter more precisely
        l = f.split_by_row_group(expression, d.schema)
        for rg in l:
            out.setdefault(rg.path, []).append(rg)
    return out

Note: Even though get_fragments() accepts the expression, it still returns all fragments. Only split_by_row_group() filters properly.

Example usage:

exp = pc.field("id") < 33
rgs = find(exp)

print(f"scanning for {exp}:")
for k, v in rgs.items():
    print(f"  {k}: {len(v)}")
    for pff in v:
        print(f"    {pff.row_groups}")
        s = pff.scanner(columns=None, filter=exp)
        for b in s.to_batches():
            for row in b.to_pylist():
                print(f"      {row}")

Output:

scanning for (id < 33):
  /tmp/0001.parquet: 2
    [RowGroupInfo(0)]
      {'id': 11, 'kind': 'a'}
      ...
  /tmp/0002.parquet: 2
    ...
  /tmp/0003.parquet: 1
    ...

You can see that only the relevant files and row groups are accessed.


Streaming Batches Efficiently

One area poorly documented is efficient streaming of RecordBatch data. You don’t want to convert everything to JSON just to send over the network.

pyarrow.ipc solves this via new_stream() and open_stream(). Here’s a minimal example using an in-memory buffer:

class Buffer:
    def __init__(self):
        self.buf = b""
    def write(self, data: bytes):
        self.buf += data
    def read(self, size=-1):
        if size < 0:
            ret, self.buf = self.buf, b""
            return ret
        ret = self.buf[:size]
        self.buf = self.buf[size:]
        return ret

Create some data:

tab = pa.Table.from_pylist(
    [{"id": i, "kind": chr(96+i)} for i in range(1,6)],
    schema=pa.schema([
        pa.field("id", pa.int32()),
        pa.field("kind", pa.string()),
    ])
)

Write an empty batch with metadata:

buf = Buffer()
w = pa.ipc.new_stream(
    buf, tab.schema,
    options=pa.ipc.IpcWriteOptions(compression="zstd")
)

w.write_batch(
    pa.RecordBatch.from_pylist([], tab.schema),
    custom_metadata={b'x': b'INIT'}
)

r = pa.ipc.open_stream(buf)

bm = r.read_next_batch_with_custom_metadata()
assert bm.batch.num_rows == 0
assert bm.custom_metadata.get(b'x') == b'INIT'

Send actual batches:

for b in tab.to_batches(max_chunksize=2):
    w.write_batch(b)
    print(f"sent {b.num_rows} rows...")

    batch = r.read_next_batch()
    for row in batch.to_pylist():
        print(f"recv {row}")

This shows efficient streaming without ever materializing full tables in memory or converting to other formats.


Closing Thoughts

If you’re dealing with large datasets in PyArrow, invest time in understanding:

  • Metadata indexing (write_metadata)

  • Fragment filtering (split_by_row_group)

  • IPC streaming (pyarrow.ipc)

These tools save memory and make your pipelines faster and more robust.

0
Subscribe to my newsletter

Read articles from Francesco “oha” Rivetti directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Francesco “oha” Rivetti
Francesco “oha” Rivetti