DEV Community

Alex Towell
Alex Towell

Posted on • Originally published at metafunctor.com

JAF: Streaming Boolean Algebra Over Nested JSON

JAF (Just Another Flow) is a streaming data processing system for JSON/JSONL data. It implements boolean algebra over nested JSON structures with lazy evaluation, composable operations, and a fluent API. JAF is the production version of the concepts I explored in dotsuite.

The Relationship to Dotsuite

The short version:

  • dotsuite: "This is how it works." Pedagogical, simple, learn-by-building.
  • JAF: "This is what you use." Feature-complete, lazy, handles real data.

JAF implements the highest level of dotsuite's architecture: boolean algebra over collections of nested documents. Where dotsuite teaches the concepts through isolated simple tools, JAF combines them into a unified streaming framework.

The Boolean Algebra Branch

In dotsuite's three-pillar architecture (Depth, Truth, Shape), JAF focuses on the collections layer, specifically the boolean wing that provides filtering operations with full boolean algebra:

[
\text{filter}: (\mathcal{D} \to \mathbb{B}) \to (C \to C)
]

Where (\mathcal{D}) is the document space, (\mathbb{B}) is boolean values, and (C) is a collection of documents.

JAF lifts boolean operations to streams: AND is intersection of filtered streams, OR is union, NOT is complement, and composition gives you chainable predicates with guaranteed homomorphism.

Core Innovation: Lazy Streaming

The Problem

Traditional data processing loads entire datasets into memory:

# Eager evaluation - loads everything
all_data = load_json("huge_file.jsonl")
filtered = [d for d in all_data if d['age'] > 25]
mapped = [transform(d) for d in filtered]
Enter fullscreen mode Exit fullscreen mode

This fails on large datasets and wastes resources when you only need the first 10 results.

JAF's Solution

from jaf import stream

# Lazy evaluation - nothing executes yet
pipeline = stream("huge_file.jsonl") \
    .filter(["gt?", "@age", 25]) \
    .map(transform) \
    .take(10)

# Only processes 10 matching items
for item in pipeline.evaluate():
    process(item)
Enter fullscreen mode Exit fullscreen mode

Constant memory (processes one item at a time), early termination (stops after take(10)), composable (build complex pipelines declaratively), and works with infinite streams.

Three Query Syntaxes

JAF supports multiple query syntaxes that all compile to the same internal representation.

S-Expression Syntax (Lisp-like)

# Simple comparisons
(eq? @status "active")
(gt? @age 25)
(contains? @tags "python")

# Boolean logic
(and
    (gte? @age 18)
    (eq? @verified true))

# Nested expressions
(or (eq? @role "admin")
    (and (eq? @role "user")
         (gt? @score 100)))
Enter fullscreen mode Exit fullscreen mode

S-expressions because: unambiguous parsing (no precedence rules), easy to serialize, homoiconic (code is data), composable ASTs.

JSON Array Syntax

# Same queries in JSON
["eq?", "@status", "active"]
["gt?", "@age", 25]

["and",
    ["gte?", "@age", 18],
    ["eq?", "@verified", true]
]
Enter fullscreen mode Exit fullscreen mode

Easy to generate programmatically, standard JSON format, network-transmissible.

Infix DSL Syntax

# Natural infix notation
@status == "active"
@age > 25 and @verified == true
@role == "admin" or (@role == "user" and @score > 100)
Enter fullscreen mode Exit fullscreen mode

Human-readable, familiar, good for CLI usage. All three compile to the same AST.

Advanced Path System

JAF extends dotsuite's simple dot notation with more powerful features:

Exact Paths

stream(data).filter(["eq?", "@user.profile.name", "Alice"])
Enter fullscreen mode Exit fullscreen mode

Wildcards

# Match any array element
stream(data).map("@users.*.email")

# Recursive descent
stream(data).map("@**.id")
Enter fullscreen mode Exit fullscreen mode

Regex Key Matching

# Match keys by pattern
["regex_key", "^error_\\d+$"]
Enter fullscreen mode Exit fullscreen mode

Fuzzy Matching

# Tolerate typos
["fuzzy_key", "username", 0.8]  # 80% similarity
Enter fullscreen mode Exit fullscreen mode

Array Operations

# Array slicing
"@items[0:5]"
"@items[-3:]"

# Predicate filtering
"@users[?(@.age > 25)]"
Enter fullscreen mode Exit fullscreen mode

Streaming Operations

Filter (Boolean Algebra)

# AND: Intersection
active_verified = stream("users.jsonl") \
    .filter(["eq?", "@status", "active"]) \
    .filter(["eq?", "@verified", true])

# OR: Union (using boolean logic in query)
admins_or_mods = stream("users.jsonl") \
    .filter(["or",
        ["eq?", "@role", "admin"],
        ["eq?", "@role", "moderator"]
    ])

# NOT: Complement
inactive = stream("users.jsonl") \
    .filter(["not", ["eq?", "@status", "active"]])
Enter fullscreen mode Exit fullscreen mode

Map (Transformations)

# Extract fields
emails = stream("users.jsonl").map("@email")

# Reshape documents
simplified = stream("users.jsonl").map(["dict",
    "name", "@firstName",
    "age", "@profile.age",
    "verified", "@verified"
])

# Conditional transformations
categorized = stream("sales.jsonl").map(["dict",
    "date", "@timestamp",
    "amount", "@amount",
    "category", ["if", ["gt?", "@amount", 1000], "high", "low"]
])
Enter fullscreen mode Exit fullscreen mode

Take/Skip (Pagination)

# First 100 items
stream("data.jsonl").take(100)

# Skip first 100, take next 100
stream("data.jsonl").skip(100).take(100)
Enter fullscreen mode Exit fullscreen mode

Batch (Chunking)

# Process in groups of 1000
for batch in stream("data.jsonl").batch(1000).evaluate():
    bulk_insert(batch)
Enter fullscreen mode Exit fullscreen mode

Windowed Operations for Memory Efficiency

JAF supports windowed operations that trade accuracy for bounded memory:

Distinct

# Exact distinct (potentially unbounded memory)
stream("data.jsonl").distinct(window_size=float('inf'))

# Windowed distinct (bounded memory, approximate)
stream("data.jsonl").distinct(window_size=1000)
Enter fullscreen mode Exit fullscreen mode

Groupby

# Tumbling window groupby
stream("logs.jsonl").groupby(
    key="@level",
    window_size=100  # Group every 100 items
)
Enter fullscreen mode Exit fullscreen mode

Set Operations

# Exact intersection
stream1.intersect(stream2, window_size=float('inf'))

# Windowed intersection (memory-efficient)
stream1.intersect(stream2, window_size=10000)
Enter fullscreen mode Exit fullscreen mode

The trade-off: finite windows provide memory bounds but may miss items if windows don't overlap. Choose window size based on your data distribution.

Real-World Examples

Log Analysis

from jaf import stream

# Find critical errors in authentication service
critical_auth_errors = stream("app.log.jsonl") \
    .filter(["and",
        ["eq?", "@level", "ERROR"],
        ["eq?", "@service", "auth"],
        ["contains?", "@message", "critical"]
    ]) \
    .map(["dict",
        "timestamp", "@timestamp",
        "user", "@context.user_id",
        "message", "@message"
    ]) \
    .evaluate()

for error in critical_auth_errors:
    alert_oncall(error)
Enter fullscreen mode Exit fullscreen mode

ETL Pipeline

# Transform sales data
pipeline = stream("raw_sales.jsonl") \
    .filter(["eq?", "@status", "completed"]) \
    .map(["dict",
        "date", ["date", "@timestamp"],
        "customer_id", "@customer.id",
        "amount", "@total",
        "category", ["if",
            ["gt?", "@total", 1000],
            "enterprise",
            "standard"
        ]
    ]) \
    .batch(1000)

for batch in pipeline.evaluate():
    warehouse.bulk_insert(batch)
Enter fullscreen mode Exit fullscreen mode

Data Validation

# Find invalid user records
invalid_users = stream("users.jsonl") \
    .filter(["or",
        ["not", ["exists?", "@email"]],
        ["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
        ["lt?", "@age", 0],
        ["gt?", "@age", 150]
    ]) \
    .map(["dict",
        "id", "@id",
        "reason", ["if",
            ["not", ["exists?", "@email"]],
            "missing_email",
            ["if",
                ["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
                "invalid_email",
                "invalid_age"
            ]
        ]
    ]) \
    .evaluate()
Enter fullscreen mode Exit fullscreen mode

Command-Line Usage

JAF works well in Unix-style pipelines:

# Filter logs
jaf filter app.log.jsonl '(eq? @level "ERROR")' --eval

# Chain operations
jaf filter users.jsonl '(eq? @status "active")' | \
jaf map - "@email" | \
jaf eval -

# Complex queries
jaf filter logs.jsonl \
  '(and (eq? @level "ERROR") (gt? @timestamp "2024-01-01"))' \
  --eval

# Integrate with other tools
jaf filter data.jsonl '(exists? @metadata)' --eval | \
jq '.metadata' | \
sort | uniq -c

# Batch processing
jaf filter orders.jsonl '(gt? @amount 100)' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
Enter fullscreen mode Exit fullscreen mode

Mapping Dotsuite Concepts to JAF

Dotsuite Tool JAF Feature Notes
dotget Path system (@path) JAF adds regex, fuzzy, wildcards
dotstar Wildcard paths @users.*.name
dotfilter .filter() method S-expression queries
dotquery Query language More powerful, S-expressions
dotmod .map() transformations Immutable by default
dotpipe Method chaining Fluent API
dotpluck Reshaping with ["dict", ...] Transform documents
dotrelate Planned (join/union) Future feature

The key difference: JAF is lazy by default, while dotsuite is mostly eager. This is what lets JAF handle datasets that don't fit in memory.

Performance Characteristics

Operation Memory Time Notes
Filter O(1) O(n) Streams one item at a time
Map O(1) O(n) Zero-copy transformations
Take O(k) O(k) Early termination
Distinct O(w) O(n) w = window_size
Groupby O(w*k) O(n) k = unique keys in window
Batch O(b) O(n) b = batch_size

Constant memory for basic operations (filter, map, take). Bounded memory for windowed operations. Early termination when using take/skip. Zero serialization overhead via streaming JSON parsing.

Future Work: Probabilistic Data Structures

I plan to add probabilistic data structures for massive-scale operations:

Bloom filters for memory-efficient approximate distinct. Count-Min Sketch for heavy hitters detection. HyperLogLog for cardinality estimation. These provide controllable accuracy/memory tradeoffs with theoretical guarantees, enabling billion-item operations on commodity hardware.

Theoretical Foundation

JAF implements boolean algebra over document collections.

Predicates as Sets

Each predicate (p: \mathcal{D} \to \mathbb{B}) defines a subset:

[
S_p = \lbrace d \in \mathcal{D} \mid p(d) = \text{true}\rbrace
]

Boolean Operations

[
\text{AND}(p_1, p_2) \Rightarrow S_{p_1} \cap S_{p_2}
]
[
\text{OR}(p_1, p_2) \Rightarrow S_{p_1} \cup S_{p_2}
]
[
\text{NOT}(p) \Rightarrow \mathcal{D} \setminus S_p
]

Homomorphism

The filter operation is a monoid homomorphism:

[
\text{filter}(C_1 \cup C_2, p) = \text{filter}(C_1, p) \cup \text{filter}(C_2, p)
]

This guarantees the operation is parallelizable (can split the collection and filter in parallel), composable (sequential filters commute), and streaming (can process incrementally).

Quick Start

# Install
pip install jaf

# Basic usage
jaf filter data.jsonl '(gt? @age 25)' --eval
Enter fullscreen mode Exit fullscreen mode

Python API:

from jaf import stream

# Build lazy pipeline
pipeline = stream("users.jsonl") \
    .filter(["eq?", "@status", "active"]) \
    .map("@email") \
    .take(100)

# Execute when ready
for email in pipeline.evaluate():
    send_newsletter(email)
Enter fullscreen mode Exit fullscreen mode

When to Use JAF vs Dotsuite

Use JAF for processing large datasets, when you need lazy evaluation, for production systems, when you need advanced path features (regex, fuzzy), and for windowed operations.

Use dotsuite for learning data processing concepts, teaching and prototyping, small datasets that fit in memory, when you want simple copyable code, and for building custom tools.

Or use both: prototype with dotsuite concepts, productionize with JAF.

Resources

License

MIT

Top comments (0)