Join Data from Anywhere: The Streaming SQL Engine That Bridges Databases, APIs, and Files
Have you ever needed to join data from a MySQL database with a PostgreSQL database, a MongoDB collection, and a REST API all in one query? Traditional databases can't do this. That's why I built the Streaming SQL Engine.
The Problem: Data Lives Everywhere
Modern applications don't store all their data in one place. You might have:
- User data in PostgreSQL
- Order data in MySQL
- Product catalog in MongoDB
- Pricing information from a REST API
- Inventory data in CSV files
- Product feeds in XML files
The challenge: How do you join all this data together?
Traditional solutions require:
- Exporting data from each system
- Importing into a central database
- Writing complex ETL pipelines
- Maintaining data synchronization
There had to be a better way.
The Solution: Streaming SQL Engine
I built a lightweight Python library that lets you join data from any source using standard SQL syntax without exporting, importing, or setting up infrastructure.
Example:
from streaming_sql_engine import Engine
import psycopg2
import pymysql
from pymongo import MongoClient
import requests
import csv
engine = Engine()
# Register PostgreSQL source (iterator function)
def postgres_users():
conn = psycopg2.connect(host="localhost", database="mydb", user="user", password="pass")
cursor = conn.cursor()
cursor.execute("SELECT id, name, email FROM users")
for row in cursor:
yield {"id": row[0], "name": row[1], "email": row[2]}
conn.close()
engine.register("postgres_users", postgres_users)
# Register MySQL source (iterator function)
def mysql_products():
conn = pymysql.connect(host="localhost", database="mydb", user="user", password="pass")
cursor = conn.cursor()
cursor.execute("SELECT id, name, price FROM products")
for row in cursor:
yield {"id": row[0], "name": row[1], "price": row[2]}
conn.close()
engine.register("mysql_products", mysql_products)
# Register MongoDB source (iterator function)
def mongo_inventory():
client = MongoClient("mongodb://localhost:27017")
for doc in client.mydb.inventory.find():
yield doc
engine.register("mongo_inventory", mongo_inventory)
# Register REST API source (iterator function)
def api_prices():
response = requests.get("https://api.example.com/prices")
for item in response.json():
yield item
engine.register("api_prices", api_prices)
# Register CSV source (iterator function)
def csv_suppliers():
with open("suppliers.csv") as f:
for row in csv.DictReader(f):
yield row
engine.register("csv_suppliers", csv_suppliers)
# Join them all in one SQL query!
query = """
SELECT
mysql_products.name,
postgres_users.email,
mongo_inventory.quantity,
api_prices.price,
csv_suppliers.supplier_name
FROM mysql_products
JOIN postgres_users ON mysql_products.user_id = postgres_users.id
JOIN mongo_inventory ON mysql_products.sku = mongo_inventory.sku
JOIN api_prices ON mysql_products.sku = api_prices.sku
JOIN csv_suppliers ON mysql_products.supplier_id = csv_suppliers.id
WHERE api_prices.price > 100
"""
for row in engine.query(query):
process(row)
That's it. No clusters, no infrastructure, no data export - just pure Python and SQL.
Why I Built This: The Problem That Needed Solving
I was working on a data reconciliation project where I needed to join data from multiple sources:
- MySQL database (product catalog)
- PostgreSQL database (user data)
- MongoDB collection (inventory)
- REST API (pricing information)
- CSV files (supplier data)
The challenge: Traditional databases can't join across different systems. I had three options:
- Export everything to one database - Time-consuming, requires ETL pipelines, data becomes stale
- Write custom Python code - Complex, error-prone, hard to maintain
- Use existing tools - Spark/Flink require clusters, DuckDB requires data import, Presto needs infrastructure
None of these worked for my use case. I needed something that:
- Could join data from different systems without export
- Was simple to use (SQL syntax)
- Required zero infrastructure
- Worked with Python natively
- Processed data efficiently (streaming)
So I built the Streaming SQL Engine.
How I Built It: Architecture and Design Decisions
Core Design Philosophy
The engine follows a pipeline architecture inspired by database query execution engines (like PostgreSQL and SQLite), but implemented in pure Python using iterators.
Key insight: Python's iterator protocol is perfect for streaming data. Each operator in the pipeline is an iterator that processes rows one at a time.
Why Iterator-Based Architecture?
Key advantages:
- Memory efficiency: Only one row in memory at a time (except for join indexes)
- Lazy evaluation: Processing starts only when you iterate over results
- Composability: Operators can be chained arbitrarily
- Extensibility: Easy to add new operators
- Python-native: Uses standard Python iterator protocol
Trade-offs:
- Performance: Python iterators are slower than compiled code, but flexibility is worth it
- Memory: Join indexes require memory, but this is necessary for efficient joins
- Complexity: Iterator chains can be complex, but they're composable and testable
Expression Evaluation
How WHERE clauses are evaluated:
The engine uses recursive AST traversal to evaluate expressions:
Why recursive: SQL expressions are trees. Recursive evaluation naturally handles nested expressions like (a > 10 AND b < 20) OR c = 5.
Join Algorithm Selection
How the engine chooses join algorithms:
The engine follows this priority order when selecting a join algorithm:
-
Check if both sides are sorted (
ordered_bymetadata)
- If yes, use
MergeJoinIterator(most memory-efficient for sorted data) - Only used when
use_polars=False
-
Check if right side is a file (
filenamemetadata)
- If yes, use
MmapLookupJoinIterator(memory-mapped, 90-99% memory reduction) - Only used when
use_polars=False
-
Check if Polars is available (
use_polarsflag)
- If yes, use
PolarsLookupJoinIterator(vectorized, SIMD-accelerated) - Used when
use_polars=Trueis explicitly set
-
Default fallback
- Use
LookupJoinIterator(Python hash-based, most compatible)
- Use
Logic:
Note: When use_polars=False (default), Merge Join and MMAP Join are checked first before falling back to Python Lookup Join. When use_polars=True is explicitly set, the engine prioritizes Polars over MMAP and Merge Join.
Protocol-Based Optimization
The key innovation: Automatic optimization detection via function signature inspection.
How it works:
- Engine inspects source function signature using
inspect.signature() - If function accepts
dynamic_whereordynamic_columnsparameters, it supports optimizations - Engine passes optimization parameters automatically
- Source function applies optimizations (filter pushdown, column pruning)
Why this approach:
- No flags needed: Detection is automatic
- Flexible: Any Python function can be a source
- Backward compatible: Simple sources still work (no optimizations)
- Extensible: Easy to add new optimization parameters
Example:
# Simple source (no optimizations)
def simple_source():
return iter([{"id": 1, "name": "Alice"}])
# Optimized source (with protocol)
def optimized_source(dynamic_where=None, dynamic_columns=None):
query = "SELECT "
if dynamic_columns:
query += ", ".join(dynamic_columns)
else:
query += "*"
query += " FROM table"
if dynamic_where:
query += f" WHERE {dynamic_where}"
return execute_query(query)
# Both work the same way:
engine.register("users", simple_source) # No optimizations
engine.register("products", optimized_source) # Optimizations apply automatically!
Memory Management
Key strategies:
- Streaming: Process one row at a time, never load full tables
- Join indexes: Only right side of joins is materialized (necessary for lookups)
- Memory-mapped files: For large JSONL files, use OS virtual memory
- Column pruning: Only extract needed columns
- Filter pushdown: Filter at source, reduce data transfer
Memory footprint:
- Left side of join: O(1) - one row at a time
- Right side of join: O(n) - hash index in memory
- Total: O(n) where n = size of right side
Why this works: In most queries, the right side is smaller (e.g., joining large product table with small category table). The engine is designed to put smaller tables on the right side.
Performance Optimizations
1. Polars Vectorization
When Polars is available, the engine uses vectorized operations:
# Instead of row-by-row filtering:
for row in rows:
if row['price'] > 100:
yield row
# Use Polars batch filtering:
df = pl.DataFrame(rows)
filtered_df = df.filter(pl.col('price') > 100)
for row in filtered_df.iter_rows(named=True):
yield row
Why faster: Polars uses SIMD instructions and columnar processing, 10-200x faster than row-by-row Python loops.
2. Memory-Mapped Joins
For large JSONL files, use OS virtual memory instead of loading into RAM:
# Instead of loading entire file:
with open('large_file.jsonl') as f:
data = [json.loads(line) for line in f] # 10GB in memory!
# Use memory-mapped file:
import mmap
with open('large_file.jsonl', 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
# OS handles memory, can process files larger than RAM
Why this works: OS virtual memory allows accessing file data without loading it all into RAM. 90-99% memory reduction for large files.
3. First Match Only Optimization
For joins where only the first match matters (prevents Cartesian products).
Why useful: Prevents Cartesian products when right side has duplicate keys. Significantly reduces output size and processing time.
4. Column Pruning
Only extracts columns needed for the query, reducing I/O and memory:
# Query only requests 'name' and 'price'
query = "SELECT name, price FROM products"
# Engine automatically requests only these columns from source
def source(dynamic_columns=None):
columns = ", ".join(dynamic_columns) # ['name', 'price']
query = f"SELECT {columns} FROM table"
5. Filter Pushdown
Pushes WHERE conditions to data sources when possible:
# Query has WHERE clause
query = "SELECT * FROM products WHERE price > 100"
# Engine automatically pushes filter to source
def source(dynamic_where=None):
query = f"SELECT * FROM table WHERE {dynamic_where}"
6. Merge Joins
Efficient joins for pre-sorted data (O(n+m) time complexity):
# Register with ordered_by to enable merge join
engine.register("users", users_source, ordered_by="id")
# Engine uses merge join algorithm
# Both sides must be sorted by join key
Design Decisions and Trade-offs
1. Why Python iterators instead of compiled code?
Decision: Use Python iterators for flexibility
Trade-off: Slower than compiled code, but:
- Works with any Python data source
- Easy to extend and customize
- No compilation step needed
- Python-native integration
2. Why separate logical planning from execution?
Decision: Two-phase approach (plan then execute)
Trade-off: Extra step, but:
- Enables query optimization
- Easier to test and debug
- Can reuse planning logic
- Clear separation of concerns
3. Why protocol-based optimization instead of flags?
Decision: Automatic detection via function signature
Trade-off: Slightly more complex detection, but:
- No flags to remember
- Automatic optimization
- Backward compatible
- More Pythonic
4. Why materialize right side of joins?
Decision: Build hash index on right side
Trade-off: Memory usage, but:
- O(1) lookup time per left row
- Necessary for efficient joins
- Can use memory-mapped files for large files
- Standard database approach
5. Why limit SQL features (no GROUP BY, aggregations)?
Decision: Focus on joins and filtering
Trade-off: Less SQL support, but:
- Simpler implementation
- Faster execution
- Focuses on core use case (cross-system joins)
- Can add later if needed
The Result
A lightweight Python library that:
- Joins data from any source using SQL
- Processes data row-by-row (streaming)
- Requires zero infrastructure
- Automatically optimizes when possible
- Works with any Python iterator
The key insight: Python's iterator protocol is perfect for streaming SQL execution. By combining SQL parsing, logical planning, and iterator-based execution, I created a tool that solves a real problem: joining data from different systems without complex infrastructure.
How It Works: Streaming Architecture
The engine processes data row-by-row, never loading entire tables into memory:
SQL Query
|
Parser -> AST
|
Planner -> Logical Plan
|
Executor -> Iterator Pipeline
|
Results (Generator)
Iterator Pipeline:
ScanIterator -> FilterIterator -> JoinIterators -> ProjectIterator -> Results
Each iterator processes rows incrementally, enabling true streaming execution. This means:
- Low memory footprint
- Can process data larger than RAM
- Results yielded immediately
- No buffering required
Supported Data Sources
The engine works with any Python iterator, making it incredibly flexible:
Databases
All databases are accessed via Python iterator functions. The engine doesn't use connectors - it works with any Python function that returns an iterator:
- PostgreSQL - Create iterator function that queries PostgreSQL and yields rows
- MySQL - Create iterator function that queries MySQL and yields rows
- MongoDB - Create iterator function that queries MongoDB and yields documents
Files
- CSV - Standard CSV files
- JSONL - JSON Lines format with memory-mapped joins
- JSON - Standard JSON files
- XML - XML parsing with ElementTree
APIs
- REST APIs - Any HTTP endpoint
- GraphQL - Via custom functions
- WebSockets - Streaming data sources
Custom Sources
- Any Python function that returns an iterator
- Generators - Perfect for streaming data
- Custom transformations - Apply Python logic between joins
SQL Features
The engine supports standard SQL syntax:
Supported Features
SELECT - Column selection, aliasing, table-qualified columns
SELECT users.name, orders.total AS order_total
FROM users
JOIN orders ON users.id = orders.user_id
JOIN - INNER JOIN and LEFT JOIN with equality conditions
SELECT *
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.id
LEFT JOIN table3 t3 ON t1.id = t3.id
WHERE - Comparisons, boolean logic, NULL checks, IN clauses
SELECT *
FROM products
WHERE price > 100
AND status IN ('active', 'pending')
AND description IS NOT NULL
Arithmetic - Addition, subtraction, multiplication, division, modulo
SELECT
price - discount AS final_price,
quantity * unit_price AS total
FROM orders
Not Supported
- GROUP BY and aggregations (COUNT, SUM, AVG)
- ORDER BY
- HAVING
- Subqueries
These limitations keep the engine focused on joins and filtering - its core strength.
Real-World Examples
Example 1: Microservices Data Integration
In a microservices architecture, data is distributed across services:
from streaming_sql_engine import Engine
import psycopg2
import pymysql
import requests
engine = Engine()
# Service 1: User service (PostgreSQL) - iterator function
def users_source():
conn = psycopg2.connect(host="user-db", port=5432, user="user", password="pass", database="users_db")
cursor = conn.cursor()
cursor.execute("SELECT id, name, email FROM users")
for row in cursor:
yield {"id": row[0], "name": row[1], "email": row[2]}
conn.close()
engine.register("users", users_source)
# Service 2: Order service (MySQL) - iterator function
def orders_source():
conn = pymysql.connect(host="order-db", port=3306, user="user", password="pass", database="orders_db")
cursor = conn.cursor()
cursor.execute("SELECT id, user_id, total FROM orders")
for row in cursor:
yield {"id": row[0], "user_id": row[1], "total": row[2]}
conn.close()
engine.register("orders", orders_source)
# Service 3: Payment service (REST API) - iterator function
def payment_source():
response = requests.get("https://payments.service/api/transactions")
for item in response.json():
yield item
engine.register("payments", payment_source)
# Join across services
query = """
SELECT users.name, orders.total, payments.status
FROM users
JOIN orders ON users.id = orders.user_id
JOIN payments ON orders.id = payments.order_id
"""
Why this matters: No need for a shared database or complex ETL pipelines. The engine accepts any Python function that returns an iterator, making it incredibly flexible.
Example 2: Real-Time Price Comparison
Compare prices from multiple XML feeds and match with MongoDB:
def parse_xml(filepath):
tree = ET.parse(filepath)
for product in tree.findall('.//product'):
yield {
'ean': product.find('ean').text,
'price': float(product.find('price').text),
'name': product.find('name').text
}
engine.register("xml1", lambda: parse_xml("prices1.xml"))
engine.register("xml2", lambda: parse_xml("prices2.xml"))
engine.register("mongo", mongo_source)
query = """
SELECT
xml1.ean,
xml1.price AS price1,
xml2.price AS price2,
mongo.sf_sku
FROM xml1
JOIN xml2 ON xml1.ean = xml2.ean
JOIN mongo ON xml1.ean = mongo.ean
WHERE xml1.price != xml2.price
"""
Example 3: Python Processing Between Joins
Apply Python logic (ML models, custom functions) between joins:
def enriched_source():
"""Source that processes data with Python before joining"""
import psycopg2
conn = psycopg2.connect(host="localhost", database="mydb", user="user", password="pass")
cursor = conn.cursor()
cursor.execute("SELECT id, name, category_id FROM products")
for row in cursor:
product = {"id": row[0], "name": row[1], "category_id": row[2]}
# Apply Python logic
product['ml_score'] = ml_model.predict(product)
product['custom_field'] = custom_function(product)
yield product
conn.close()
def categories_source():
import psycopg2
conn = psycopg2.connect(host="localhost", database="mydb", user="user", password="pass")
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM categories")
for row in cursor:
yield {"id": row[0], "category_name": row[1]}
conn.close()
engine.register("enriched_products", enriched_source)
engine.register("categories", categories_source)
query = """
SELECT p.name, p.ml_score, c.category_name
FROM enriched_products p
JOIN categories c ON p.category_id = c.id
"""
Why this matters: Seamless integration with Python ecosystem - use any library, apply any logic.
Comparison with Alternatives
vs DuckDB
| Feature | Streaming SQL Engine | DuckDB |
|---|---|---|
| Cross-system joins | ✅ Direct | ⚠️ Requires import |
| API + Database join | ✅ Direct | ❌ Must export API |
| Real-time streaming | ✅ True streaming | ⚠️ Buffering needed |
| Python processing | ✅ Native | ⚠️ Export/import |
| GROUP BY / Aggregations | ❌ Not supported | ✅ Full support |
| Performance (same DB) | ⚠️ Moderate | ✅ Very fast |
Use Streaming SQL Engine when: You need to join data from different systems that can't be imported into DuckDB.
Use DuckDB when: All data can be imported and you need aggregations.
vs Pandas
| Feature | Streaming SQL Engine | Pandas |
|---|---|---|
| Cross-system joins | ✅ Direct | ❌ Must load all data |
| Memory efficiency | ✅ Streaming | ❌ Loads all in memory |
| SQL syntax | ✅ Standard SQL | ❌ DataFrame API |
| Large datasets | ✅ Can exceed RAM | ❌ Limited by RAM |
| Real-time data | ✅ Streaming | ❌ Batch only |
| File formats | ✅ Any Python iterator | ⚠️ Limited formats |
| Performance (large data) | ✅ Streaming | ⚠️ Slower for large data |
Use Streaming SQL Engine when: You need to join data from different systems, process data larger than RAM, or use SQL syntax.
Use Pandas when: All data fits in memory and you prefer DataFrame API.
Are There Other Tools Like This?
Short answer: Not exactly.
While there are many tools that do parts of what Streaming SQL Engine does, none combine all these characteristics:
What Makes Streaming SQL Engine Unique
- Zero Infrastructure + Cross-System Joins
- Most tools require clusters (Spark, Flink, Drill)
- Or require specific infrastructure (ksqlDB needs Kafka)
- Streaming SQL Engine: Just Python
- Any Python Iterator as Data Source
- Most tools require specific connectors
- Streaming SQL Engine: Any Python function works
- Direct API Joins
- Most tools can't join REST APIs directly
- Streaming SQL Engine: Native support
-
Python-Native Architecture
- Most tools are Java/Rust with Python wrappers
- Streaming SQL Engine: Pure Python, seamless integration
Similar Tools (But Different)
Apache Drill - Similar cross-system capability, but requires cluster and Java
ksqlDB - Streaming SQL, but Kafka-only and requires infrastructure
Materialize - Streaming database, but requires database server
DataFusion - Fast SQL engine, but limited to Arrow/Parquet data
Polars SQL - Fast SQL, but requires loading data into DataFrames first
Presto/Trino - Cross-system SQL, but requires cluster infrastructure
None of these combine:
- Zero infrastructure
- Any Python iterator as source
- Direct API joins
- Pure Python implementation
- Simple deployment
That's what makes Streaming SQL Engine unique.
Start Here: The Most Secure Way
Step 1: Install and Basic Setup
pip install streaming-sql-engine
Two Stable Join Options
The engine provides two stable join algorithms that work reliably with any data:
Option 1: Lookup Join (Default)
What it is: Python hash-based join - the default when no special metadata is provided.
How to use:
engine = Engine() # use_polars=False (default)
engine.register("products", products_source)
engine.register("images", images_source)
# No special metadata - uses Lookup Join automatically
How it works:
- Builds hash index on right side (loads all right rows into memory)
- Streams left side row-by-row
- Looks up matches in hash index (O(1) per lookup)
Performance (100K products, 300K images):
- Rows processed: 299,553
- Time: 39.63 seconds
- Memory overhead: 177.11 MB
- CPU: 2.8%
- Throughput: 7,559 rows/second
When to use:
- Default choice - works with any data
- Small to medium datasets (< 1M rows)
- When right side fits in memory
- When data is not sorted
Pros:
- ✅ Most compatible (works with any data types)
- ✅ No special requirements
- ✅ Reliable and stable
- ✅ Good performance for medium datasets
Cons:
- ⚠️ Loads entire right side into memory
- ⚠️ Not optimal for very large right tables
Option 2: Merge Join (Sorted Data)
What it is: Streaming merge join for pre-sorted data - most memory-efficient join algorithm.
How to use:
engine = Engine() # use_polars=False (required for Merge Join)
engine.register("products", products_source, ordered_by="product_id")
engine.register("images", images_source, ordered_by="product_id")
# Both sides must be sorted by join key
How it works:
- Both sides stream simultaneously (no index needed)
- Compares join keys and advances iterators
- When keys match, joins the rows
- Only buffers small groups of duplicate keys
Performance (100K products, 300K images, sorted):
- Rows processed: 179,871
- Time: 24.79 seconds
- Memory overhead: 0.87 MB (lowest!)
- CPU: 3.5%
- Throughput: 7,256 rows/second
When to use:
- Data is already sorted (or can be sorted once)
- Memory-constrained environments
- Very large datasets (> 1M rows)
- When you want maximum memory efficiency
Pros:
- ✅ Lowest memory usage (O(1) - only buffers current rows)
- ✅ Fast for sorted data (O(n+m) single pass)
- ✅ True streaming (both sides stream simultaneously)
- ✅ Can handle datasets larger than RAM
Cons:
- ⚠️ Requires pre-sorted data
- ⚠️ Must specify
ordered_bymetadata - ⚠️ Requires
use_polars=False
Example: Preparing Data for Merge Join
# Step 1: Sort your JSONL files
# Use the provided utility:
python examples/sort_jsonl.py products.jsonl products_sorted.jsonl product_id
python examples/sort_jsonl.py images.jsonl images_sorted.jsonl product_id
# Step 2: Use sorted files with Merge Join
engine = Engine(use_polars=False)
engine.register("products", lambda: load_jsonl("products_sorted.jsonl"), ordered_by="product_id")
engine.register("images", lambda: load_jsonl("images_sorted.jsonl"), ordered_by="product_id")
# Step 3: Query - Merge Join will be used automatically
query = """
SELECT products.product_id, products.title, images.image
FROM products
LEFT JOIN images ON products.product_id = images.product_id
"""
Sample Data Structure:
For Merge Join to work, your data must be sorted. Example:
products.jsonl (sorted by product_id):
{"product_id": 1, "title": "Product 1", "checked": 1}
{"product_id": 2, "title": "Product 2", "checked": 1}
{"product_id": 3, "title": "Product 3", "checked": 0}
{"product_id": 4, "title": "Product 4", "checked": 1}
images.jsonl (sorted by product_id):
{"product_id": 1, "image": "img1.jpg", "image_type": "main"}
{"product_id": 1, "image": "img1_alt.jpg", "image_type": "thumbnail"}
{"product_id": 2, "image": "img2.jpg", "image_type": "main"}
{"product_id": 4, "image": "img4_1.jpg", "image_type": "main"}
{"product_id": 4, "image": "img4_2.jpg", "image_type": "gallery"}
Result: Merge Join processes both files simultaneously, comparing product_id values and joining matching rows. Memory usage stays constant regardless of file size.
Recommendation 1: Use Merge Join if Data is Sorted (Fastest + Lowest Memory)
Best for: Pre-sorted data or data that can be sorted once
Configuration:
engine = Engine(use_polars=False) # Merge Join requires use_polars=False
engine.register("products", products_source, ordered_by="product_id")
engine.register("images", images_source, ordered_by="product_id")
Performance:
- Time: 159.51 seconds
- Memory overhead: 69.50 MB (lowest!)
- Speed: 941 rows/second
- CPU: 3.9%
Why it's best:
- ✅ Lowest memory overhead (69.50 MB vs 236+ MB for other options)
- ✅ True streaming (both sides stream simultaneously)
- ✅ Fast execution (comparable to other options)
- ✅ Can handle datasets larger than RAM
When to use:
- Data is already sorted by join key
- Memory is a concern
- You can sort data once (e.g., sort JSONL files before processing)
- Both sides of join are sorted
Example: Preparing Data for Merge Join
# Step 1: Sort your JSONL files (one-time operation)
# Use a utility script or database ORDER BY
python examples/sort_jsonl.py products.jsonl products_sorted.jsonl product_id
python examples/sort_jsonl.py images.jsonl images_sorted.jsonl product_id
# Step 2: Use sorted files with Merge Join
engine = Engine(use_polars=False)
engine.register("products",
lambda: load_jsonl("products_sorted.jsonl"),
ordered_by="product_id")
engine.register("images",
lambda: load_jsonl("images_sorted.jsonl"),
ordered_by="product_id")
# Step 3: Query - Merge Join will be used automatically
query = """
SELECT products.product_id, products.title, images.image
FROM products
LEFT JOIN images ON products.product_id = images.product_id
WHERE products.checked = 1
"""
Recommendation 2: Use MMAP Join if Data is NOT Sorted (Low Memory)
Best for: Unsorted data, large files, memory-constrained environments
Configuration:
engine = Engine(use_polars=False) # MMAP Join requires use_polars=False
engine.register("products", products_source, filename="products.jsonl")
engine.register("images", images_source, filename="images.jsonl")
Performance:
- Time: 165.74 seconds
- Memory overhead: 77.48 MB (very low!)
- Speed: 905 rows/second
- CPU: 5.0%
Why it's best:
- ✅ Low memory overhead (77.48 MB vs 236+ MB for Lookup Join)
- ✅ Works with unsorted data (no sorting required)
- ✅ OS-managed memory mapping (can handle files larger than RAM)
- ✅ 90-99% memory reduction compared to loading entire file
When to use:
- Data is NOT sorted (or sorting is expensive)
- Large files (> 100MB)
- Memory-constrained systems
- Files larger than available RAM
- You want low memory without sorting
Example:
engine = Engine(use_polars=False)
def jsonl_source():
with open("products.jsonl", "r") as f:
for line in f:
if line.strip():
yield json.loads(line)
# Register with filename parameter to enable MMAP Join
engine.register("products", jsonl_source, filename="products.jsonl")
engine.register("images", images_source, filename="images.jsonl")
Recommendation 3: Use Polars + Optimizations for Best Throughput
Best for: Maximum speed when you can filter early and normalize data types
Configuration:
engine = Engine(use_polars=True, first_match_only=True)
def optimized_source(dynamic_where=None, dynamic_columns=None):
"""
Source with all optimizations:
- Filter pushdown (dynamic_where)
- Column pruning (dynamic_columns)
- Data normalization (for Polars)
"""
# Build optimized query
query = build_query(dynamic_where, dynamic_columns)
for row in execute_query(query):
# Normalize types for Polars stability
yield normalize_types(row)
engine.register("products", optimized_source)
engine.register("images", images_source)
Performance (with early filtering):
- Time: 54.30 seconds (fastest!)
- Memory overhead: 46.68 MB (very low!)
- Speed: 922 rows/second
- CPU: 6.6%
- Rows processed: 50,089 (filtered early, vs 150,048 without filtering)
Why it's best:
- ✅ Fastest execution time (54.30s vs 157-165s for others)
- ✅ Low memory overhead (46.68 MB)
- ✅ Early filtering reduces data volume significantly
- ✅ Vectorized operations (SIMD acceleration)
- ✅ All optimizations combined (filter pushdown + column pruning)
When to use:
- You can filter data early (WHERE clause can be pushed to source)
- Data types are consistent (can normalize)
- Speed is priority
- Polars is available
- You want maximum performance
Trade-offs:
- ⚠️ Requires protocol support in source function (
dynamic_where,dynamic_columns) - ⚠️ Requires data normalization for Polars stability
- ⚠️ Requires Polars dependency
- ⚠️ Processes fewer rows (because of early filtering)
Summary: Choosing the Right Configuration for 100K+ Records
For 100K+ records, follow this decision tree:
- Is your data sorted (or can you sort it)?
- ✅ YES Use Merge Join (
ordered_byparameter)- Best memory efficiency (69.50 MB)
- Fast execution (159.51s)
- True streaming
- Is your data NOT sorted?
- ✅ YES Use MMAP Join (
filenameparameter)- Low memory (77.48 MB)
- Works with unsorted data
- Good for large files
-
Do you need maximum throughput and can filter early?
- ✅ YES Use Polars + Optimizations (
use_polars=True+ protocols)- Fastest execution (54.30s)
- Low memory (46.68 MB)
- Requires protocol support
- ✅ YES Use Polars + Optimizations (
Default fallback: If none of the above apply, use Lookup Join (default Python) - it's stable and works with any data (159.44s, 236.12 MB).
Performance Comparison Table: 100K+ Records (150K+ Joined Rows)
Based on comprehensive benchmarks with 150,048 joined rows (50,089 products with ~3 images each, filtered to checked=1):
| Configuration | Rows Processed | Time (s) | Memory Overhead (MB) | CPU % | Throughput (rows/s) |
|---|---|---|---|---|---|
| 1. Merge Join (Sorted Data) | 150,048 | 159.51 | 69.50 | 3.9% | 941 |
| 2. Lookup Join (Default Python) | 150,048 | 159.44 | 236.12 | 1.8% | 941 |
| 3. Polars Join | 150,048 | 157.74 | 285.64 | 4.1% | 951 |
| 4. MMAP Join | 150,048 | 165.74 | 77.48 | 5.0% | 905 |
| 5. Polars + Column Pruning | 150,048 | 157.72 | 243.85 | 3.1% | 951 |
| 6. Polars + Filter Pushdown | 150,048 | 157.74 | 239.38 | 2.0% | 951 |
| 7. All Optimizations Combined | 50,089 | 54.30 | 46.68 | 6.6% | 922 |
Note: Configuration 7 processes fewer rows (50,089 vs 150,048) because it applies filter pushdown early, filtering products with checked=1 before the join. This demonstrates the power of early filtering.
Key Insights for 100K+ Records
Fastest Execution: All Optimizations Combined (54.30s)
- Uses Polars Join + Column Pruning + Filter Pushdown + early filtering
- Processes 50,089 rows (filtered early) vs 150,048 without filtering
- Best for speed priority when you can filter early
Lowest Memory (Full Join): Merge Join (69.50 MB)
- True streaming, no index needed
- Best for memory-constrained environments with sorted data
- Processes all 150,048 rows with minimal memory
Lowest Memory (Unsorted Data): MMAP Join (77.48 MB)
- Works with unsorted data
- OS-managed memory mapping
- Best for large files when data is not sorted
Highest Throughput: Polars + Column Pruning (951 rows/s)
- Vectorized operations + reduced I/O
- Best for processing large volumes
- Processes all 150,048 rows efficiently
Best Balance: Merge Join (159.51s, 69.50 MB)
- Good speed with lowest memory overhead
- Best for sorted data when memory matters
- True streaming architecture
Recommendations for 100K+ Records
For 100K+ records, use this decision tree:
- Data is sorted (or can be sorted)?
- ✅ YES Merge Join (
ordered_byparameter)- Memory: 69.50 MB (lowest)
- Time: 159.51s
- Best for: Memory-constrained + sorted data
- Data is NOT sorted?
- ✅ YES MMAP Join (
filenameparameter)- Memory: 77.48 MB (very low)
- Time: 165.74s
- Best for: Unsorted data + memory-constrained
-
Need maximum speed + can filter early?
- ✅ YES Polars + Optimizations (
use_polars=True+ protocols)- Memory: 46.68 MB (lowest with filtering)
- Time: 54.30s (fastest!)
- Best for: Speed priority + early filtering possible
- ✅ YES Polars + Optimizations (
Default fallback: Lookup Join (default Python) - stable and works with any data (159.44s, 236.12 MB).
Performance Guide
By Dataset Size
| Size | Configuration | Why |
|---|---|---|
| < 10K rows |
use_polars=False (default) |
Fastest, most stable |
| 10K-100K rows |
use_polars=False (default) |
Still fastest, handles mixed types |
| 100K-1M rows | See recommendations below | Choose based on data characteristics |
| > 1M rows | All optimizations | Maximum performance |
For 100K-1M rows, choose based on your data:
-
Data is sorted Merge Join (
ordered_byparameter) - 69.50 MB memory, 159.51s -
Data is NOT sorted MMAP Join (
filenameparameter) - 77.48 MB memory, 165.74s -
Need maximum speed + can filter early Polars + Optimizations (
use_polars=True+ protocols) - 46.68 MB memory, 54.30s
Common Pitfalls
Pitfall 1: Using Polars Without Normalization
Problem:
engine = Engine(use_polars=True)
# Mixed types cause schema inference errors
Solution:
def normalized_source():
for row in raw_source():
yield {
"id": int(row.get("id", 0)),
"price": float(row.get("price", 0.0)),
}
Pitfall 2: Using MMAP Without Polars (Very Slow)
Problem:
engine = Engine(use_polars=False)
engine.register("table", source, filename="data.jsonl") # Very slow!
Solution:
# MMAP uses Polars internally for index building if available
# But you still need use_polars=False for MMAP Join algorithm
engine = Engine(use_polars=False) # MMAP Join requires this
engine.register("table", source, filename="data.jsonl")
# MMAP will use Polars internally for faster index building
Pitfall 3: Using MMAP for Small Files
Problem:
# MMAP overhead > benefit for small files
engine.register("table", source, filename="small.jsonl") # Slower!
Solution:
# No filename for small files
engine.register("table", source) # Faster for < 100MB
Summary
Start Here (Most Secure)
engine = Engine() # Default: use_polars=False
engine.register("table1", source1)
engine.register("table2", source2)
Why: Most stable, handles all edge cases, works with any data types
Performance: 39.63s, 177.11 MB memory, 7,559 rows/s
Then Experiment
-
Add debug mode:
Engine(debug=True)- See what's happening -
Try Polars:
Engine(use_polars=True)- For large datasets (39.36s, 197.64 MB) -
Try MMAP:
filename="data.jsonl"- For large files (19.65s, 43.50 MB) -
Try Merge Join:
ordered_by="key"- For sorted data (24.79s, 0.87 MB)
Conclusion
The Streaming SQL Engine fills a unique niche: cross-system data integration. While it may not match the raw performance of specialized tools for their specific use cases, it excels at joining data from different systems - a problem that traditional databases cannot solve.
Key strengths:
- Cross-system joins (databases, APIs, files)
- Zero infrastructure requirements
- Memory-efficient streaming architecture
- Python-native integration
- Automatic optimizations
- Simple deployment
Best suited for:
- Microservices data aggregation
- Cross-system ETL pipelines
- Real-time data integration
- Memory-constrained environments
- Python-native workflows
For cross-system data integration, the Streaming SQL Engine provides a unique solution that balances performance, simplicity, and flexibility.
Top comments (0)