DEV Community

Databend
Databend

Posted on

Reading Source Code of Databend (2) :Query Server Startup, Session Management & Request Processing

Entrypoint of query server

The entrypoint of query server is in the directory of databend/src/binaries/query/main.rs. After initial configurations are completed, a “ GlobalServices” and a “shutdown_handle” are created. The latter handles shutdown logic when the server is closed.

GlobalServices::init(conf.clone()).await?;
let mut shutdown_handle = ShutdownHandle::create()?;
Enter fullscreen mode Exit fullscreen mode

GlobalServices

“GlobalServices” are responsible for starting all the global services for databend-query, and all of them follow the single responsibility principle.

pub struct GlobalServices {
    global_runtime: UnsafeCell<Option<Arc<Runtime>>>,
    // Process query logs
    query_logger: UnsafeCell<Option<Arc<QueryLogger>>>,
    // Implement cluster discovery mechanism for databend-query
    cluster_discovery: UnsafeCell<Option<Arc<ClusterDiscovery>>>,
    // Interact with the storage layer to read/write data
    storage_operator: UnsafeCell<Option<Operator>>,
    async_insert_manager: UnsafeCell<Option<Arc<AsyncInsertManager>>>,
    cache_manager: UnsafeCell<Option<Arc<CacheManager>>>,
    catalog_manager: UnsafeCell<Option<Arc<CatalogManager>>>,
    http_query_manager: UnsafeCell<Option<Arc<HttpQueryManager>>>,
    data_exchange_manager: UnsafeCell<Option<Arc<DataExchangeManager>>>,
    session_manager: UnsafeCell<Option<Arc<SessionManager>>>,
    users_manager: UnsafeCell<Option<Arc<UserApiProvider>>>,
    users_role_manager: UnsafeCell<Option<Arc<RoleCacheManager>>>,
}
Enter fullscreen mode Exit fullscreen mode

All the global services in “GlobalServices” implement a trait singleton. This article focuses on the logics of session processing, and the global managers will be introduced in subsequent articles.

pub trait SingletonImpl<T>: Send + Sync {
    fn get(&self) -> T;

    fn init(&self, value: T) -> Result<()>;
}

pub type Singleton<T> = Arc<dyn SingletonImpl<T>>;
Enter fullscreen mode Exit fullscreen mode

ShutdownHandler

Next, the handlers are initialized according to the network protocol and registered in “shutdown_handler” service, types that implements server trait can all be added to the services.

Image description

#[async_trait::async_trait]
pub trait Server: Send {
    async fn shutdown(&mut self, graceful: bool);
    async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr>;
}
Enter fullscreen mode Exit fullscreen mode

Currently, Databend supports three request protocols (MySQL, Clickhouse HTTP, and raw HTTP).

// MySQL handler.
{
    let hostname = conf.query.mysql_handler_host.clone();
    let listening = format!("{}:{}", hostname, conf.query.mysql_handler_port);
    let mut handler = MySQLHandler::create(session_manager.clone());
    let listening = handler.start(listening.parse()?).await?;
    // register the service in shutdown_handle to process server shutdown,same as below
    shutdown_handle.add_service(handler);
}

// ClickHouse HTTP handler.
{
    let hostname = conf.query.clickhouse_http_handler_host.clone();
    let listening = format!("{}:{}", hostname, conf.query.clickhouse_http_handler_port);

    let mut srv = HttpHandler::create(session_manager.clone(), HttpHandlerKind::Clickhouse);
    let listening = srv.start(listening.parse()?).await?;
    shutdown_handle.add_service(srv);
}

// Databend HTTP handler.
{
    let hostname = conf.query.http_handler_host.clone();
    let listening = format!("{}:{}", hostname, conf.query.http_handler_port);

    let mut srv = HttpHandler::create(session_manager.clone(), HttpHandlerKind::Query);
    let listening = srv.start(listening.parse()?).await?;
    shutdown_handle.add_service(srv);
}
Enter fullscreen mode Exit fullscreen mode

Then some other services are created:

  • Metric service: Metrics related services

  • Admin service: Administration related services

  • RPC service: RPC service for query nodes, which handles the communications between query nodes using arrow flight protocol

// Metric API service.
{
    let address = conf.query.metric_api_address.clone();
    let mut srv = MetricService::create(session_manager.clone());
    let listening = srv.start(address.parse()?).await?;
    shutdown_handle.add_service(srv);
    info!("Listening for Metric API: {}/metrics", listening);
}

// Admin HTTP API service.
{
    let address = conf.query.admin_api_address.clone();
    let mut srv = HttpService::create(session_manager.clone());
    let listening = srv.start(address.parse()?).await?;
    shutdown_handle.add_service(srv);
    info!("Listening for Admin HTTP API: {}", listening);
}

// RPC API service.
{
    let address = conf.query.flight_api_address.clone();
    let mut srv = RpcService::create(session_manager.clone());
    let listening = srv.start(address.parse()?).await?;
    shutdown_handle.add_service(srv);
    info!("Listening for RPC API (interserver): {}", listening);
}
Enter fullscreen mode Exit fullscreen mode

Finally, this query node is registered in meta server.

// Cluster register.
{
    let cluster_discovery = session_manager.get_cluster_discovery();
    let register_to_metastore = cluster_discovery.register_to_metastore(&conf);
    register_to_metastore.await?;
}
Enter fullscreen mode Exit fullscreen mode

About sessions

There are four parts in session management:

  • session_manager: Globally unique, manages client sessions

  • session: Every time a new client connects to the server, a session is created and registered to the session_ manager

  • query_ctx: Each query creates a query_ctx, which is used to store the context information

  • query_ctx_shared: Context information shared by subqueries

Image description

Let's look at them one by one.

SessionManager (query/src/sessions/session_mgr.rs)

pub struct SessionManager {
    pub(in crate::sessions) conf: Config,
    pub(in crate::sessions) max_sessions: usize,
    pub(in crate::sessions) active_sessions: Arc<RwLock<HashMap<String, Arc<Session>>>>,
    pub status: Arc<RwLock<SessionManagerStatus>>,

    // When session type is MySQL, insert into this map, key is id, val is MySQL connection id.
    pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
    pub(in crate::sessions) mysql_basic_conn_id: AtomicU32,
}
Enter fullscreen mode Exit fullscreen mode

“SessionManager” is mainly used to create and destroy sessions, the specific methods are as follows:

// Create a session according to the client protocol
pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<SessionRef>

// Destroy a session by session_ids
pub fn destroy_session(self: &Arc<Self>, session_id: &String)
Enter fullscreen mode Exit fullscreen mode

Session (query/src/sessions/session.rs)

Context information of the client server stored in the session. No more detail will be described as the code logic is already clear.

pub struct Session {
    pub(in crate::sessions) id: String,
    pub(in crate::sessions) typ: RwLock<SessionType>,
    pub(in crate::sessions) session_ctx: Arc<SessionContext>,
    status: Arc<RwLock<SessionStatus>>,
    pub(in crate::sessions) mysql_connection_id: Option<u32>,
}
pub struct SessionContext {
    conf: Config,
    abort: AtomicBool,
    current_catalog: RwLock<String>,
    current_database: RwLock<String>,
    current_tenant: RwLock<String>,
    current_user: RwLock<Option<UserInfo>>,
    auth_role: RwLock<Option<String>>,
    client_host: RwLock<Option<SocketAddr>>,
    io_shutdown_tx: RwLock<Option<Sender<Sender<()>>>>,
    query_context_shared: RwLock<Option<Arc<QueryContextShared>>>,
}

pub struct SessionStatus {
    pub session_started_at: Instant,
    pub last_query_finished_at: Option<Instant>,
}
Enter fullscreen mode Exit fullscreen mode

Another major function of “Session” is to create and obtain “QueryContext” s. Each time a query request is received, a “QueryContext” is created and bound to the corresponding query statement.

QueryContext (query/src/sessions/query_ctx.rs)

QueryContexts are used to maintain context information of certain queries . They're created by “QueryContext::create_from_shared(query_ctx_shared)”

#[derive(Clone)]
pub struct QueryContext {
    version: String,
    statistics: Arc<RwLock<Statistics>>,
    partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
    shared: Arc<QueryContextShared>,
    precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
    fragment_id: Arc<AtomicUsize>,
}
Enter fullscreen mode Exit fullscreen mode

Among the members, “partition_queue” stores the corresponding PartInfo, including the address and version of the part, rows of data involved, the compression algorithm used, and the meta information related to column. Partition is set when building the pipeline. There will be subsequent articles on pipeline. Metadata that has been written to the storage by the temporary insert operation but has not been submitted is stored in “precommit_blocks” . “DataBlock” contains the meta information reference of column and information of arrow schema.

QueryContextShared (query/src/sessions/query_ctx_shared.rs)

For queries containing subqueries, much context information needs to be shared. This is why we need “QueryContextShared”.

/// It is important that data is shared among query context, for example:
///     USE database_1;
///     SELECT
///         (SELECT scalar FROM table_name_1) AS scalar_1,
///         (SELECT scalar FROM table_name_2) AS scalar_2,
///         (SELECT scalar FROM table_name_3) AS scalar_3
///     FROM table_name_4;
/// runtime, session, progress, init_query_id are shared among the subqueries
pub struct QueryContextShared {
    /// scan_progress for scan metrics of datablocks (uncompressed)
    pub(in crate::sessions) scan_progress: Arc<Progress>,
    /// write_progress for write/commit metrics of datablocks (uncompressed)
    pub(in crate::sessions) write_progress: Arc<Progress>,
    /// result_progress for metrics of result datablocks (uncompressed)
    pub(in crate::sessions) result_progress: Arc<Progress>,
    pub(in crate::sessions) error: Arc<Mutex<Option<ErrorCode>>>,
    pub(in crate::sessions) session: Arc<Session>,
    pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>,
    pub(in crate::sessions) init_query_id: Arc<RwLock<String>>,
    ...
}
Enter fullscreen mode Exit fullscreen mode

It provides all the basic information required by the query context.

Handler

As mentioned earlier, Databend supports multiple handlers. Let's take MySQL handlers as an example to see the processing procedure of handlers and how they interact with sessions. First, a reference to “SessionManager” is contained in the “MySQLHandler” .

pub struct MySQLHandler {
    abort_handle: AbortHandle,
    abort_registration: Option<AbortRegistration>,
    join_handle: Option<JoinHandle<()>>,
}
Enter fullscreen mode Exit fullscreen mode

After it's started, the “MySQLHandler” spawns a tokio task to continuously listen to TCP stream, create a session, and then start a task to execute the following query requests.

fn accept_socket(session_mgr: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
    executor.spawn(async move {
        // create a session
        match session_mgr.create_session(SessionType::MySQL).await {
            Err(error) => Self::reject_session(socket, error).await,
            Ok(session) => {
                info!("MySQL connection coming: {:?}", socket.peer_addr());
                // execut queries
                if let Err(error) = MySQLConnection::run_on_stream(session, socket) {
                    error!("Unexpected error occurred during query: {:?}", error);
                };
            }
        }
    });
}
Enter fullscreen mode Exit fullscreen mode

In the function of “MySQLConnection::run_on_stream”, the session first attaches to the corresponding client host and registers a shutdown closure to handle related cleanups when the connection is closed.

Related code is as follows:

// mysql_session.rs
pub fn run_on_stream(session: SessionRef, stream: TcpStream) -> Result<()> {
    let blocking_stream = Self::convert_stream(stream)?;
    MySQLConnection::attach_session(&session, &blocking_stream)?;

    ...
}

fn attach_session(session: &SessionRef, blocking_stream: &std::net::TcpStream) -> Result<()> {
    let host = blocking_stream.peer_addr().ok();
    let blocking_stream_ref = blocking_stream.try_clone()?;
    session.attach(host, move || {
        // register shutdown 
        if let Err(error) = blocking_stream_ref.shutdown(Shutdown::Both) {
            error!("Cannot shutdown MySQL session io {}", error);
        }
    });

    Ok(())
}

// session.rs
pub fn attach<F>(self: &Arc<Self>, host: Option<SocketAddr>, io_shutdown: F)
where F: FnOnce() + Send + 'static {
    let (tx, rx) = oneshot::channel();
    self.session_ctx.set_client_host(host);
    self.session_ctx.set_io_shutdown_tx(Some(tx));

    common_base::base::tokio::spawn(async move {
        // trigger cleanups when the session quits 
        if let Ok(tx) = rx.await {
            (io_shutdown)();
            tx.send(()).ok();
        }
    });
}
Enter fullscreen mode Exit fullscreen mode

Then a MySQL InteractiveWorker is started to handle subsequent queries.

let join_handle = query_executor.spawn(async move {
    let client_addr = non_blocking_stream.peer_addr().unwrap().to_string();
    let interactive_worker = InteractiveWorker::create(session, client_addr);
    let opts = IntermediaryOptions {
        process_use_statement_on_query: true,
    };
    let (r, w) = non_blocking_stream.into_split();
    let w = BufWriter::with_capacity(DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE, w);
    AsyncMysqlIntermediary::run_with_options(interactive_worker, r, w, &opts).await
});
let _ = futures::executor::block_on(join_handle);
Enter fullscreen mode Exit fullscreen mode

“InteractiveWorker” implements AsyncMysqlShim trait methods, such as “on_execute”, “on_query” and so on. When a query arrives, these methods are called to make executions.Take “on_query”

for example, the core code is as follows:

async fn on_query<'a>(
    &'a mut self,
    query: &'a str,
    writer: QueryResultWriter<'a, W>,
) -> Result<()> {
    ...

    // response writer
    let mut writer = DFQueryResultWriter::create(writer);

    let instant = Instant::now();
    // execute queries
    let blocks = self.base.do_query(query).await;

    // write results
    let format = self.base.session.get_format_settings()?;
    let mut write_result = writer.write(blocks, &format);

    ...

    // metrics info
    histogram!(
        super::mysql_metrics::METRIC_MYSQL_PROCESSOR_REQUEST_DURATION,
        instant.elapsed()
    );

    write_result
}
Enter fullscreen mode Exit fullscreen mode

In “do_query”, a “QueryContext ” is created and the subsequent SQL queries are executed with the parsed SQL process. Related code is as follows:

// create a QueryContext
let context = self.session.create_query_context().await?;
// attach it to the query statement
context.attach_query_str(query);

let settings = context.get_settings();

// parse sql
let stmts_hints = DfParser::parse_sql(query, context.get_current_session().get_type());
...

// Define and generate a query plan
let mut planner = Planner::new(context.clone());
let interpreter = planner.plan_sql(query).await.and_then(|v| {
    has_result_set = has_result_set_by_plan(&v.0);
    InterpreterFactoryV2::get(context.clone(), &v.0)
})

// Execute queries and return the results
Self::exec_query(interpreter.clone(), &context).await?;
let schema = interpreter.schema();
Ok(QueryResult::create(
    blocks,
    extra_info,
    has_result_set,
    schema,
))
Enter fullscreen mode Exit fullscreen mode

Epilogue

The whole process from starting Databend to accepting SQL requests and starting processing is described in this article. Recently, we removed the Clickhouse native TCP client for some reasons (The Clickhouse TCP protocol is biased towards the underlying protocol of Clickhouse. With heavy historical burdens and no public documentation, the debugging process became too exhausting. See more in: https://github.com/datafuselabs/databend/pull/7012) .

Please feel free to discuss your good ideas here with us. In addition, if any relevant problem is found, you can always submit issues to help improve Databend's stability. Databend community welcomes all well-intentioned comments and suggestions :)

About Databend

Databend is an open source modern data warehouse with elasticity and low cost. It can do real-time data analysis on object-based storage.We look forward to your attention and hope to explore the cloud native data warehouse solution, and create a new generation of open source data cloud together.

Databend documentation:https://databend.rs/

Twitter:https://twitter.com/Datafuse_Labs

Slack:https://datafusecloud.slack.com/

Wechat:Databend

GitHub :https://github.com/datafuselabs/databend

Top comments (0)