Be like a duck. Calm on the surface, but always paddling like the dickens underneath.
Michael Caine
Context
You’ve probably heard or used DuckDB in the Data Space. If not let’s introduce you to a game-changing tool that will dope your data stack as a Data Engineer / Analysts / Scientist / etc.
According to the official website DuckDB is an in-process SQL OLAP database management system. Simple, feature-rich, fast & open source.
, it’s simply said but forget to emphasize on it’s speed and flexibility. You read more here good_tutorial.
Here is my own definition of DuckDB: DuckDB is a SQL OLAP DB that brings together efficiency (V-fast vectorized query engine), performance and versatility/accessibility (easy install + can query files local or remotely + extensions that enable even more)
.
I know I sound like a DuckDB evangelist 😅, but please bear with me.
Now, let’s dive into what brings us together today!
Problem
DuckDB processes data using the local resources and the only (that i’m aware of at the moment) Cloud-like solution is MotherDuck (DuckDB’s own Cloud Service).
Hosting all tools and users on the same server in a data infrastructure has several drawbacks, which can significantly impact performance, reliability, security, and scalability such as:
- Performance Bottlenecks: Tools and users competing for the same CPU, memory, and disk I/O can lead to resource contention and degraded performance during peak loads.
- Single Point of Failure: If the server crashes or experiences an outage, all tools and users are affected simultaneously, leading to complete operational downtime.
- Scalability Challenge: A single server has finite resources, making it difficult to scale as the number of users or tools increases.
- Security Risks: A single compromised server can expose all tools and user data to potential breaches.
- Operational Complexity: Diagnosing issues becomes harder when multiple tools and users are intertwined on a single server.
- Compliance and Governance Risks: Hosting all users and tools together may violate regulatory requirements or customer expectations for data isolation.
Solution
What if you could let your DuckDB spread its wings ✈️ for broader access?
Remote connections open up opportunities to centralize data, enable multiple users, and achieve performance at scale.
In this guide, we’ll unlock the power of DuckDB’s remote access by pairing it with two key players: Apache Arrow (for efficient data transfer) and Arrow Flight RPC (gRPC under the hood) (for quick, reliable communication). By the end, you’ll have a fully functional API setup that will have your DuckDB connection flying in no time.
✈️ Let Your DuckDB Take Flight!
💻 Code
Here is the repository of the project:
https://github.com/mikekenneth/duckdb_streamlit_arrow_flight
Explanation
First, let’s discuss the High Level Design.
Below is Sequence Diagram of what we are trying to achieve:
The sequence diagram outlines the process of querying flight data via a web application. A User inputs an SQL query into the WebApp (Client), which sends a GetFlightInfo
request to the FlightServer with the appropriate command descriptor. The server executes the SQL query using DuckDB, then returns a FlightInfo
object containing a ticket to the client. The client subsequently uses the ticket to make a do_get
request to the server, which streams the flight data back to the client. Finally, the client displays the results to the user.
To begin, we need the below components:
- Python: We’ll use Python for it’s simplicity and accessibility.
- DuckDB: Am sure I have yapped 😅enough about it.
- Apache Arrow : It contains a set of technologies that enable big data systems to process and move data fast
- Apache Arrow Flight RPC : Arrow Flight is an RPC framework for high-performance data services based on Arrow data, and is built on top of gRPC and the IPC format.
- Streamlit: Streamlit turns data scripts into shareable web apps in minutes.
🏗️ Installation
To Install the project, we need follow the below steps:
- Install the needed python modules
pip install -r requirements.txt
- Create/Update the
.env
file with the below as needed:
# CAUTION: If using the source .env command,
## make sure there no space before & after the '=' signs in this file
# Flight Server
SERVER_FLIGHT_HOST='0.0.0.0'
SERVER_FLIGHT_PORT=8815
# If using Local Storage for the Flight datasets
SERVER_FLIGHT_DATA_DIR_TYPE='local' # Options: ['local', 's3', 'minio']
SERVER_FLIGHT_DATA_DIR_BASE='data/datasets'
# DuckDB file
SERVER_DUCKDB_FILE='data/duck.db'
Server Code breakdown
For the Server, we used Apache Arrow Flight RPC to set up a gRPC Server that receives SQL queries as command
.
DuckDBClient
This class exist to manage the connection and querying with DuckDB
.
import duckdb
class DuckDBClient:
def __init__(self, db_location: Path = None):
if db_location is None:
self.db_location = Path("data/duck.db")
else:
self.db_location = db_location
self.db_location.parent.mkdir(exist_ok=True)
self.duckdb_client = duckdb.connect(self.db_location)
def query(self, sql_query) -> pa_table:
result = self.duckdb_client.query(sql_query)
return result.arrow()
FlightServer
Here we create the FlighServer
class that old all the methods/functionality of the Server.
import os
import pyarrow.parquet as pa_parquet
import pyarrow.flight as pa_flight
from pyarrow import Table as pa_table
from uuid import uuid4
from pathlib import Path
class FlightServer(pa_flight.FlightServerBase):
def __init__(self, location="grpc://0.0.0.0:8815", repo: Path = Path("./datasets"), **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
self._repo = repo
def _duckdb_query_handler(self, sql: str) -> str:
duck_client = DuckDBClient()
return duck_client.query(sql_query=sql)
def _make_flight_info(self, dataset):
dataset_path = self._repo / dataset
schema = pa_parquet.read_schema(dataset_path)
metadata = pa_parquet.read_metadata(dataset_path)
descriptor = pa_flight.FlightDescriptor.for_path(dataset.encode("utf-8"))
endpoints = [pa_flight.FlightEndpoint(dataset, [self._location])]
return pa_flight.FlightInfo(schema, descriptor, endpoints, metadata.num_rows, metadata.serialized_size)
def list_flights(self, context, criteria):
for dataset in self._repo.iterdir():
yield self._make_flight_info(dataset.name)
def get_flight_info(self, context, descriptor: pa_flight):
print(self._repo)
if descriptor.descriptor_type == pa_flight.DescriptorType.CMD:
command = descriptor.command.decode("utf-8")
print(f"Executing the SQL command: {command}")
query_result_arrow_table = self._duckdb_query_handler(sql=command)
dataset_name = f"{str(uuid4())}.parquet"
pa_parquet.write_table(query_result_arrow_table, self._repo / dataset_name)
print(f"Result Dataset: {dataset_name}")
return self._make_flight_info(dataset=dataset_name)
return self._make_flight_info(descriptor.path[0].decode("utf-8"))
# TODO: You could use do_put to allow users to upload temporal tables
def do_put(self, context, descriptor, reader, writer):
dataset = descriptor.path[0].decode("utf-8")
dataset_path = self._repo / dataset
data_table = reader.read_all()
pa_parquet.write_table(data_table, dataset_path)
def do_get(self, context, ticket):
dataset = ticket.ticket.decode("utf-8")
dataset_path = self._repo / dataset
return pa_flight.RecordBatchStream(pa_parquet.read_table(dataset_path))
def list_actions(self, context):
return [("drop_dataset", "Delete a dataset.")]
def do_drop_dataset(self, dataset):
dataset_path = self._repo / dataset
dataset_path.unlink()
main()
Below is the code the start the Flight Server.
if __name__ == "__main__":
# Get Env Variables FLIGHT_HOST = os.getenv("SERVER_FLIGHT_HOST", "0.0.0.0")
FLIGHT_PORT = os.getenv("SERVER_FLIGHT_PORT", 8815)
FLIGHT_DATA_DIR_TYPE = os.getenv("SERVER_FLIGHT_DATA_DIR_TYPE", "local")
FLIGHT_DATA_DIR = os.getenv("SERVER_FLIGHT_DATA_DIR_BASE", "./datasets")
print("Initiating Flight Server")
server_location = f"grpc://{FLIGHT_HOST}:{FLIGHT_PORT}"
server_data_dir = Path(FLIGHT_DATA_DIR)
server_data_dir.mkdir(exist_ok=True)
server = FlightServer(location=server_location, repo=server_data_dir)
print("Starting Flight Server !")
server.serve()
Client Code breakdown
On the client side, we use same Apache arrow library to connect to the Server, then use Streamlit
to manage the web interface and display of results
Connect to Flight & Execute query
import os
import pyarrow.flight as pa_flight
from pyarrow import Table
# Get Env VariablesFLIGHT_HOST = os.getenv("SERVER_FLIGHT_HOST", "0.0.0.0")
FLIGHT_PORT = os.getenv("SERVER_FLIGHT_PORT", 8815)
def execute_query(sql_query: str) -> Table:
# Connect to the Flight Server
server_location = f"grpc://{FLIGHT_HOST}:{FLIGHT_PORT}"
flight_client = pa_flight.connect(server_location)
token_pair = flight_client.authenticate_basic_token(b'test', b'password')
options = pa_flight.FlightCallOptions(headers=[token_pair])
# Run query against the Flight Server
flight_command_descriptor = pa_flight.FlightDescriptor.for_command(sql_query, options=options)
# TODO: Add SQL inject parser
# Retrieve the Resulst Flight Descriptor
response_flight = flight_client.get_flight_info(flight_command_descriptor)
# Read content of the dataset
reader = flight_client.do_get(response_flight.endpoints[0].ticket)
result_table = reader.read_all()
return response_flight, result_table
Streamlit Interface
Here we use Streamlit to build a simple interface that allow users to enter a SQL query (with streamlit-ace
: a code block interface to ease the query written) and display results (schema, row count & data).
import streamlit as st
from streamlit_ace import st_ace
from xlib.flight import execute_query
st.set_page_config(page_icon=":duck:", page_title="Remote DuckDB ⛁ Client with Arrow Flight RPC")
st.title("Remote DuckDB ⛁ Client with Arrow Flight")
with st.form(key="query_form"):
st.markdown("## 🇸 🇶 🇱 Query Space")
sql_query = st_ace(
# value="select '1' as testing;" placeholder="Write your Query here...",
language="sql",
theme="dracula",
keybinding="vscode",
font_size=14,
tab_size=4,
show_gutter=True,
show_print_margin=False,
wrap=False,
auto_update=True,
readonly=False,
min_lines=20,
key="code_content",
)
submit = st.form_submit_button(label="Run Query")
if submit:
# Run the Query against Flight Server
result_flight_info, result_table = execute_query(sql_query)
st.subheader("Results", divider="rainbow", anchor="results")
tab1, tab2, tab3, tab4 = st.tabs(["Result", "Query", "Schema", "Info"])
with tab1:
st.dataframe(result_table.to_pandas(), use_container_width=True)
with tab2:
st.markdown(f"```
{% endraw %}
sql\n {sql_query} \n
{% raw %}
```")
with tab3:
st.markdown(f"```
{% endraw %}
csharp\n=== Result Schema ===\n{result_flight_info.schema}\n=====================\n
{% raw %}
```")
with tab4:
st.dataframe(
{
"Total Rows:": result_flight_info.total_records,
"Total Size (Bytes):": result_flight_info.total_bytes,
"Dataset Paths:": [i.decode("utf-8") for i in result_flight_info.descriptor.path],
},
hide_index=True,
)
📹 Demo
Here is a short demo of the solution.
https://www.youtube.com/watch?v=jS5p8pwnlOo
➡️ Next Steps
As the quote says Nothing is perfect
.
Hence, I will adding more features to the project as time goes by and am very open to contributions, comments, etc.
🙏Thanks
Thanks for reading.
Top comments (0)