DEV Community

Cover image for Unlocking DuckDB from Anywhere - A Guide to Remote Access with Apache Arrow and Flight RPC (gRPC)
Mike Houngbadji
Mike Houngbadji

Posted on

Unlocking DuckDB from Anywhere - A Guide to Remote Access with Apache Arrow and Flight RPC (gRPC)

Be like a duck. Calm on the surface, but always paddling like the dickens underneath.
Michael Caine

Context

You’ve probably heard or used DuckDB in the Data Space. If not let’s introduce you to a game-changing tool that will dope your data stack as a Data Engineer / Analysts / Scientist / etc.

According to the official website DuckDB is an in-process SQL OLAP database management system. Simple, feature-rich, fast & open source., it’s simply said but forget to emphasize on it’s speed and flexibility. You read more here good_tutorial.

Here is my own definition of DuckDB: DuckDB is a SQL OLAP DB that brings together efficiency (V-fast vectorized query engine), performance and versatility/accessibility (easy install + can query files local or remotely + extensions that enable even more).

I know I sound like a DuckDB evangelist 😅, but please bear with me.

Now, let’s dive into what brings us together today!

Problem

https://github.com/mikekenneth/blogpost_resources/raw/main/duckdb_from_anywhere/this-is-a-problem-derek-muller.gif

DuckDB processes data using the local resources and the only (that i’m aware of at the moment) Cloud-like solution is MotherDuck (DuckDB’s own Cloud Service).

https://github.com/mikekenneth/blogpost_resources/raw/main/duckdb_from_anywhere/flight_before.png

Hosting all tools and users on the same server in a data infrastructure has several drawbacks, which can significantly impact performance, reliability, security, and scalability such as:

  • Performance Bottlenecks: Tools and users competing for the same CPU, memory, and disk I/O can lead to resource contention and degraded performance during peak loads.
  • Single Point of Failure: If the server crashes or experiences an outage, all tools and users are affected simultaneously, leading to complete operational downtime.
  • Scalability Challenge: A single server has finite resources, making it difficult to scale as the number of users or tools increases.
  • Security Risks: A single compromised server can expose all tools and user data to potential breaches.
  • Operational Complexity: Diagnosing issues becomes harder when multiple tools and users are intertwined on a single server.
  • Compliance and Governance Risks: Hosting all users and tools together may violate regulatory requirements or customer expectations for data isolation.

Solution

https://github.com/mikekenneth/blogpost_resources/raw/main/duckdb_from_anywhere/problem_solved.gif

What if you could let your DuckDB spread its wings ✈️ for broader access?
Remote connections open up opportunities to centralize data, enable multiple users, and achieve performance at scale.

In this guide, we’ll unlock the power of DuckDB’s remote access by pairing it with two key players: Apache Arrow (for efficient data transfer) and Arrow Flight RPC (gRPC under the hood) (for quick, reliable communication). By the end, you’ll have a fully functional API setup that will have your DuckDB connection flying in no time.

https://github.com/mikekenneth/blogpost_resources/raw/main/duckdb_from_anywhere/flight_after.png

✈️ Let Your DuckDB Take Flight!

💻 Code

Here is the repository of the project:

https://github.com/mikekenneth/duckdb_streamlit_arrow_flight

Explanation

First, let’s discuss the High Level Design.
Below is Sequence Diagram of what we are trying to achieve:

The sequence diagram outlines the process of querying flight data via a web application. A User inputs an SQL query into the WebApp (Client), which sends a GetFlightInfo request to the FlightServer with the appropriate command descriptor. The server executes the SQL query using DuckDB, then returns a FlightInfo object containing a ticket to the client. The client subsequently uses the ticket to make a do_get request to the server, which streams the flight data back to the client. Finally, the client displays the results to the user.

https://github.com/mikekenneth/blogpost_resources/raw/main/duckdb_from_anywhere/duckdb_from_anywhere_sequence_diagram.png

To begin, we need the below components:

  • Python: We’ll use Python for it’s simplicity and accessibility.
  • DuckDB: Am sure I have yapped 😅enough about it.
  • Apache Arrow : It contains a set of technologies that enable big data systems to process and move data fast
  • Apache Arrow Flight RPC : Arrow Flight is an RPC framework for high-performance data services based on Arrow data, and is built on top of gRPC and the IPC format.
  • Streamlit: Streamlit turns data scripts into shareable web apps in minutes.

🏗️ Installation

To Install the project, we need follow the below steps:

  1. Install the needed python modules
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode
  1. Create/Update the .env file with the below as needed:
# CAUTION: If using the source .env command,
## make sure there no space before & after the '=' signs in this file
# Flight Server
SERVER_FLIGHT_HOST='0.0.0.0'
SERVER_FLIGHT_PORT=8815
# If using Local Storage for the Flight datasets
SERVER_FLIGHT_DATA_DIR_TYPE='local'  # Options: ['local', 's3', 'minio']
SERVER_FLIGHT_DATA_DIR_BASE='data/datasets'
# DuckDB file
SERVER_DUCKDB_FILE='data/duck.db'
Enter fullscreen mode Exit fullscreen mode

Server Code breakdown

For the Server, we used Apache Arrow Flight RPC to set up a gRPC Server that receives SQL queries as command.

DuckDBClient

This class exist to manage the connection and querying with DuckDB.

import duckdb
class DuckDBClient:
    def __init__(self, db_location: Path = None):
        if db_location is None:
            self.db_location = Path("data/duck.db")
        else:
            self.db_location = db_location
        self.db_location.parent.mkdir(exist_ok=True)
        self.duckdb_client = duckdb.connect(self.db_location)
    def query(self, sql_query) -> pa_table:
        result = self.duckdb_client.query(sql_query)
        return result.arrow()
Enter fullscreen mode Exit fullscreen mode

FlightServer

Here we create the FlighServer class that old all the methods/functionality of the Server.

import os
import pyarrow.parquet as pa_parquet
import pyarrow.flight as pa_flight
from pyarrow import Table as pa_table
from uuid import uuid4
from pathlib import Path
class FlightServer(pa_flight.FlightServerBase):
    def __init__(self, location="grpc://0.0.0.0:8815", repo: Path = Path("./datasets"), **kwargs):
        super(FlightServer, self).__init__(location, **kwargs)
        self._location = location
        self._repo = repo
    def _duckdb_query_handler(self, sql: str) -> str:
        duck_client = DuckDBClient()
        return duck_client.query(sql_query=sql)
    def _make_flight_info(self, dataset):
        dataset_path = self._repo / dataset
        schema = pa_parquet.read_schema(dataset_path)
        metadata = pa_parquet.read_metadata(dataset_path)
        descriptor = pa_flight.FlightDescriptor.for_path(dataset.encode("utf-8"))
        endpoints = [pa_flight.FlightEndpoint(dataset, [self._location])]
        return pa_flight.FlightInfo(schema, descriptor, endpoints, metadata.num_rows, metadata.serialized_size)
    def list_flights(self, context, criteria):
        for dataset in self._repo.iterdir():
            yield self._make_flight_info(dataset.name)
    def get_flight_info(self, context, descriptor: pa_flight):
        print(self._repo)
        if descriptor.descriptor_type == pa_flight.DescriptorType.CMD:
            command = descriptor.command.decode("utf-8")
            print(f"Executing the SQL command: {command}")
            query_result_arrow_table = self._duckdb_query_handler(sql=command)
            dataset_name = f"{str(uuid4())}.parquet" 
            pa_parquet.write_table(query_result_arrow_table, self._repo / dataset_name)
            print(f"Result Dataset: {dataset_name}")
            return self._make_flight_info(dataset=dataset_name)
        return self._make_flight_info(descriptor.path[0].decode("utf-8"))
    # TODO: You could use do_put to allow users to upload temporal tables      
    def do_put(self, context, descriptor, reader, writer):
        dataset = descriptor.path[0].decode("utf-8")
        dataset_path = self._repo / dataset
        data_table = reader.read_all()
        pa_parquet.write_table(data_table, dataset_path)
    def do_get(self, context, ticket):
        dataset = ticket.ticket.decode("utf-8")
        dataset_path = self._repo / dataset
        return pa_flight.RecordBatchStream(pa_parquet.read_table(dataset_path))
    def list_actions(self, context):
        return [("drop_dataset", "Delete a dataset.")]
    def do_drop_dataset(self, dataset):
        dataset_path = self._repo / dataset
        dataset_path.unlink()
Enter fullscreen mode Exit fullscreen mode

main()

Below is the code the start the Flight Server.

if __name__ == "__main__":
    # Get Env Variables    FLIGHT_HOST = os.getenv("SERVER_FLIGHT_HOST", "0.0.0.0")
    FLIGHT_PORT = os.getenv("SERVER_FLIGHT_PORT", 8815)
    FLIGHT_DATA_DIR_TYPE = os.getenv("SERVER_FLIGHT_DATA_DIR_TYPE", "local")
    FLIGHT_DATA_DIR = os.getenv("SERVER_FLIGHT_DATA_DIR_BASE", "./datasets")
    print("Initiating Flight Server")
    server_location = f"grpc://{FLIGHT_HOST}:{FLIGHT_PORT}"
    server_data_dir = Path(FLIGHT_DATA_DIR)
    server_data_dir.mkdir(exist_ok=True)
    server = FlightServer(location=server_location, repo=server_data_dir)
    print("Starting Flight Server !")
    server.serve()
Enter fullscreen mode Exit fullscreen mode

Client Code breakdown

On the client side, we use same Apache arrow library to connect to the Server, then use Streamlit to manage the web interface and display of results

Connect to Flight & Execute query

import os
import pyarrow.flight as pa_flight
from pyarrow import Table
# Get Env VariablesFLIGHT_HOST = os.getenv("SERVER_FLIGHT_HOST", "0.0.0.0")
FLIGHT_PORT = os.getenv("SERVER_FLIGHT_PORT", 8815)
def execute_query(sql_query: str) -> Table:
    # Connect to the Flight Server
    server_location = f"grpc://{FLIGHT_HOST}:{FLIGHT_PORT}"
    flight_client = pa_flight.connect(server_location)
    token_pair = flight_client.authenticate_basic_token(b'test', b'password')
    options = pa_flight.FlightCallOptions(headers=[token_pair])
    # Run query against the Flight Server
    flight_command_descriptor = pa_flight.FlightDescriptor.for_command(sql_query, options=options)
    # TODO: Add SQL inject parser    
    # Retrieve the Resulst Flight Descriptor
    response_flight = flight_client.get_flight_info(flight_command_descriptor)
    # Read content of the dataset
    reader = flight_client.do_get(response_flight.endpoints[0].ticket)
    result_table = reader.read_all()
    return response_flight, result_table
Enter fullscreen mode Exit fullscreen mode

Streamlit Interface

Here we use Streamlit to build a simple interface that allow users to enter a SQL query (with streamlit-ace: a code block interface to ease the query written) and display results (schema, row count & data).

import streamlit as st
from streamlit_ace import st_ace
from xlib.flight import execute_query
st.set_page_config(page_icon=":duck:", page_title="Remote DuckDB ⛁  Client with Arrow Flight RPC")
st.title("Remote DuckDB ⛁  Client with Arrow Flight")
with st.form(key="query_form"):
    st.markdown("## 🇸 🇶 🇱 Query Space")
    sql_query = st_ace(
        # value="select '1' as testing;"        placeholder="Write your Query here...",
        language="sql",
        theme="dracula",
        keybinding="vscode",
        font_size=14,
        tab_size=4,
        show_gutter=True,
        show_print_margin=False,
        wrap=False,
        auto_update=True,
        readonly=False,
        min_lines=20,
        key="code_content",
    )
    submit = st.form_submit_button(label="Run Query")
if submit:
    # Run the Query against Flight Server
    result_flight_info, result_table = execute_query(sql_query)
    st.subheader("Results", divider="rainbow", anchor="results")
    tab1, tab2, tab3, tab4 = st.tabs(["Result", "Query", "Schema", "Info"])
    with tab1:
        st.dataframe(result_table.to_pandas(), use_container_width=True)
    with tab2:
        st.markdown(f"```
{% endraw %}
sql\n {sql_query} \n
{% raw %}
```")
    with tab3:
        st.markdown(f"```
{% endraw %}
csharp\n=== Result Schema ===\n{result_flight_info.schema}\n=====================\n
{% raw %}
```")
    with tab4:
        st.dataframe(
            {
                "Total Rows:": result_flight_info.total_records,
                "Total Size (Bytes):": result_flight_info.total_bytes,
                "Dataset Paths:": [i.decode("utf-8") for i in result_flight_info.descriptor.path],
            },
            hide_index=True,
        )
Enter fullscreen mode Exit fullscreen mode

📹 Demo

Here is a short demo of the solution.

https://www.youtube.com/watch?v=jS5p8pwnlOo

➡️ Next Steps

As the quote says Nothing is perfect.
Hence, I will adding more features to the project as time goes by and am very open to contributions, comments, etc.

🙏Thanks

Thanks for reading.

Top comments (0)