DEV Community

Nikita Almakov
Nikita Almakov

Posted on

Stop Writing Nested Loops for ETL. Compile Them with convtools

convtools is a tiny Python library (GitHub link) that turns declarative transforms into plain functions you can run, profile, and ship. It does group_by, join, and streams CSVs -- often ~1.2×–6× faster than hand‑written loops in many cases.


TL;DR

  • You describe a transform with a small Python DSL (c.item, c.group_by, c.join, etc.).

  • gen_converter() compiles it into a plain Python function, no per‑call runtime overhead.

  • It’s ideal when you’ll run the same transform many times or over a stream (think ETL jobs), and you want readability + speed.


The 30‑second payoff

Task: rename keys and reshape a list of dicts.

from convtools import conversion as c

config = {
    "key1": "old key1",
    "key2": "old key2",
    "key3": "old key3",
}
data = [{"old key1": 1, "old key2": 2, "old key3": 3}]

converter = c.list_comp(
    {k: c.item(v) for k, v in config.items()}
).gen_converter()
print(converter(data))
# [{"key1": 1, "key2": 2, "key3": 3}]

# plain Python version:
print([{k: item[v] for k, v in config.items()} for item in data])
# [{"key1": 1, "key2": 2, "key3": 3}]
Enter fullscreen mode Exit fullscreen mode

This compiles to a plain Python function, the mapping over config.items() is precomputed at compile time, so there’s no per‑row dict iteration over config.

In my own profiling, this yielded 195μs / 72.9μs ≈ 2.67× speedup vs straight loop (with a tiny one‑off compile cost: 52 μs, which pays off if you reuse it).

profiling code (click to expand)
# Python 3.10.15 (main, Nov 28 2024, 23:39:17) [Clang 16.0.0 (clang-1600.0.26.4)] on darwin
data = [{"old key1": 1, "old key2": 2, "old key3": 3} for i in range(1000)]

%%timeit
c.list_comp(
    {k: c.item(v) for k, v in config.items()}
).gen_converter()
# 51.5 μs ± 151 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

%timeit converter(data)
# 72.9 μs ± 487 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

%timeit [{k: item[v] for k, v in config.items()} for item in data]
# 195 μs ± 1.11 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Enter fullscreen mode Exit fullscreen mode

generated code (click to expand)
In [30]: c.list_comp(
    ...:     {k: c.item(v) for k, v in config.items()}
    ...: ).gen_converter(debug=True)
    ...:
def _converter(data_):
    try:
        return [
            {
                "key1": _i["old key1"],
                "key2": _i["old key2"],
                "key3": _i["old key3"],
            }
            for _i in data_
        ]
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise
Enter fullscreen mode Exit fullscreen mode


Where convtools shines

1) group_by without a DataFrame

Single pass, memory proportional to number of groups.
Outputs groups in order of first appearance.

from convtools import conversion as c

converter = (
    c.group_by(c.item("country"))
    .aggregate(
        {
            "country": c.item("country"),
            "total": c.ReduceFuncs.Sum(c.item("sales")),
            "other_total": c.ReduceFuncs.Sum(
                c.item("sales"),
                where=c.item("sales") < 10,
            ),
        }
    )
    .gen_converter()
)

rows = [
    {"country": "US", "sales": 10},
    {"country": "US", "sales": 7},
    {"country": "DE", "sales": 3},
]
print(converter(rows))
# [
#     {"country": "US", "total": 17, "other_total": 7},
#     {"country": "DE", "total": 3, "other_total": 3},
# ]
Enter fullscreen mode Exit fullscreen mode

c.group_by(...).aggregate(...) returns a list of results.

This compiles to a single‑pass aggregator that doesn’t keep the whole input in memory.

generated code (click to expand)
class AggData_:
    __slots__ = ["v0", "v1"]

    def __init__(self, _none=__none__):
        self.v0 = _none
        self.v1 = _none

def group_by_(_none, data_):
    signature_to_agg_data_ = defaultdict(AggData_)

    for row_ in data_:
        _r0_ = row_["sales"]
        agg_data_ = signature_to_agg_data_[row_["country"]]
        if _r0_ < 10:
            if agg_data_.v1 is _none:
                agg_data_.v1 = _r0_ or 0
            else:
                agg_data_.v1 += _r0_ or 0
        if agg_data_.v0 is _none:
            agg_data_.v0 = _r0_ or 0
        else:
            agg_data_.v0 += _r0_ or 0

    return [
        {
            "country": signature_,
            "total": ((0 if (agg_data_.v0 is _none) else agg_data_.v0)),
            "other_total": ((0 if (agg_data_.v1 is _none) else agg_data_.v1)),
        }
        for signature_, agg_data_ in signature_to_agg_data_.items()
    ]

def _converter(data_):
    global __none__
    _none = __none__
    try:
        return group_by_(_none, data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise
Enter fullscreen mode Exit fullscreen mode

2) join two sequences, declaratively

from convtools import conversion as c

left = [{"id": 1, "name": "Nick"}, {"id": 2, "name": "Joash"}]
right = [{"ID": "2", "age": 21}, {"ID": "1", "age": 18}]
input_data = (left, right)

converter = (
    c.join(
        c.item(0),
        c.item(1),
        c.LEFT.item("id") == c.RIGHT.item("ID").as_type(int),
        how="left",
    )
    .pipe(
        c.list_comp(
            {"id": c.item(0, "id"), "age": c.item(1, "age", default=None)}
        )
    )
    .gen_converter()
)
print(converter(input_data))
# [{"id": 1, "age": 18}, {"id": 2, "age": 21}]
Enter fullscreen mode Exit fullscreen mode

Joins on plain iterables/dicts—no heavy framework.
Equi‑joins build a hash map of the right side (memory O(len(right))).
Supports "inner", "left", "right", "outer" (aka "full") via how param.

3) Stream big CSV → aggregate in single pass

from convtools import conversion as c
from convtools.contrib.tables import Table

stream = Table.from_csv("big.csv", header=True).into_iter_rows(dict)
# or from iterable of dicts/tuples/lists as:
# Table.from_rows(input_data, header=True)

converter = (
    c.group_by(c.item("country"))
    .aggregate(
        {
            "country": c.item("country"),
            "total": c.ReduceFuncs.Sum(c.item("sales")),
        }
    )
    .gen_converter()
)
result = converter(stream)  # compute once
for row in result:
    ...

# or sink to csv file
# Table.from_rows(result).into_csv("out.csv")
Enter fullscreen mode Exit fullscreen mode

Table turns CSVs into row streams so you can aggregate without loading the whole file.


Performance

  • Datetime formatting/parsing: specialized converters for %% %A %a %B %H %I %M %S %Y %b %d %f %m %p %u %w %y strftime/strptime formats saw ~4.5× (format) and ~3.7× (parse) over the built‑ins, falling back to standard ones for non-optimized ones -- useful when the format is fixed or reused a lot. These are microbenchmarks; measure your case.

profiling code (click to expand)
# Python 3.10.15 (main, Nov 28 2024, 23:39:17) [Clang 16.0.0 (clang-1600.0.26.4)] on darwin
from datetime import datetime
from convtools import conversion as c

dt = datetime(2023, 8, 1)
assert dt.strftime("%b %Y") == "Aug 2023"

# In [2]: %timeit dt.strftime("%b %Y")
# 799 ns ± 5.06 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)

ad_hoc_func = c.format_dt("%b %Y").gen_converter()
assert ad_hoc_func(dt) == "Aug 2023"

# In [32]: %timeit ad_hoc_func(dt)
# 178 ns ± 2.04 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)

# In [3]: %timeit datetime.strptime("12/31/2020 12:05:54 PM", "%m/%d/%Y %I:%M:%S %p")
# 3.02 μs ± 19.6 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

ad_hoc_func = c.datetime_parse("%m/%d/%Y %I:%M:%S %p").gen_converter()
# In [44]: %timeit ad_hoc_func("12/31/2020 12:05:54 PM")
# 816 ns ± 6.2 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
Enter fullscreen mode Exit fullscreen mode

  • Loop unrolling configs: the “rename keys” pattern above measured ~2.7× speedup.

Compile pays off when you’ll call the converter multiple times or process a stream/batch big enough that per‑row savings dominate the ~0.05-1ms compile.


What gets compiled (and how you debug)

convtools compiles to regular Python you can print (debug=True) and step through. On exceptions, tracebacks look normal (linecache is populated), so post‑mortem debugging works. This is critical for trust.


Why not just Pandas / Polars?

Because sometimes you want:

  • Iterables in / iterables out (APIs, services, small ETL steps) without pulling in a DataFrame runtime.

  • Single‑pass reducers and streaming CSV without holding everything in memory.

  • Ahead‑of‑time compiled, plain‑Python functions you can profile or embed anywhere.

If you need columnar ops, group‑join on huge data with vectorized math, or out‑of‑core analytics, use Pandas/Polars/DuckDB. convtools complements them.


Install & recipes

pip install convtools
Enter fullscreen mode Exit fullscreen mode

Docs & cheatsheet: convtools.readthedocs.io.

The “Basics” page demonstrates how primitives map to compiled functions; skim that next.


When not to use convtools

  • Your job runs once and finishes in <50 ms.
  • Your team hates code generation on principle.
  • You need heavy vectorized numerics.
  • Your data fits in memory and you prefer interactive analysis (e.g., Jupyter with Pandas).

Call to action

If this sounds useful, try one of the snippets above on your own data. Feedback and real‑world examples are gold -- issues or discussions welcome on GitHub. (Stars help others find it, too.)

Top comments (0)