Entrypoint of query server
The entrypoint of query server is in the directory of databend/src/binaries/query/main.rs. After initial configurations are completed, a “ GlobalServices” and a “shutdown_handle” are created. The latter handles shutdown logic when the server is closed.
GlobalServices::init(conf.clone()).await?;
let mut shutdown_handle = ShutdownHandle::create()?;
GlobalServices
“GlobalServices” are responsible for starting all the global services for databend-query, and all of them follow the single responsibility principle.
pub struct GlobalServices {
global_runtime: UnsafeCell<Option<Arc<Runtime>>>,
// Process query logs
query_logger: UnsafeCell<Option<Arc<QueryLogger>>>,
// Implement cluster discovery mechanism for databend-query
cluster_discovery: UnsafeCell<Option<Arc<ClusterDiscovery>>>,
// Interact with the storage layer to read/write data
storage_operator: UnsafeCell<Option<Operator>>,
async_insert_manager: UnsafeCell<Option<Arc<AsyncInsertManager>>>,
cache_manager: UnsafeCell<Option<Arc<CacheManager>>>,
catalog_manager: UnsafeCell<Option<Arc<CatalogManager>>>,
http_query_manager: UnsafeCell<Option<Arc<HttpQueryManager>>>,
data_exchange_manager: UnsafeCell<Option<Arc<DataExchangeManager>>>,
session_manager: UnsafeCell<Option<Arc<SessionManager>>>,
users_manager: UnsafeCell<Option<Arc<UserApiProvider>>>,
users_role_manager: UnsafeCell<Option<Arc<RoleCacheManager>>>,
}
All the global services in “GlobalServices” implement a trait singleton. This article focuses on the logics of session processing, and the global managers will be introduced in subsequent articles.
pub trait SingletonImpl<T>: Send + Sync {
fn get(&self) -> T;
fn init(&self, value: T) -> Result<()>;
}
pub type Singleton<T> = Arc<dyn SingletonImpl<T>>;
ShutdownHandler
Next, the handlers are initialized according to the network protocol and registered in “shutdown_handler” service, types that implements server trait can all be added to the services.
#[async_trait::async_trait]
pub trait Server: Send {
async fn shutdown(&mut self, graceful: bool);
async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr>;
}
Currently, Databend supports three request protocols (MySQL, Clickhouse HTTP, and raw HTTP).
// MySQL handler.
{
let hostname = conf.query.mysql_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.mysql_handler_port);
let mut handler = MySQLHandler::create(session_manager.clone());
let listening = handler.start(listening.parse()?).await?;
// register the service in shutdown_handle to process server shutdown,same as below
shutdown_handle.add_service(handler);
}
// ClickHouse HTTP handler.
{
let hostname = conf.query.clickhouse_http_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.clickhouse_http_handler_port);
let mut srv = HttpHandler::create(session_manager.clone(), HttpHandlerKind::Clickhouse);
let listening = srv.start(listening.parse()?).await?;
shutdown_handle.add_service(srv);
}
// Databend HTTP handler.
{
let hostname = conf.query.http_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.http_handler_port);
let mut srv = HttpHandler::create(session_manager.clone(), HttpHandlerKind::Query);
let listening = srv.start(listening.parse()?).await?;
shutdown_handle.add_service(srv);
}
Then some other services are created:
Metric service: Metrics related services
Admin service: Administration related services
RPC service: RPC service for query nodes, which handles the communications between query nodes using arrow flight protocol
// Metric API service.
{
let address = conf.query.metric_api_address.clone();
let mut srv = MetricService::create(session_manager.clone());
let listening = srv.start(address.parse()?).await?;
shutdown_handle.add_service(srv);
info!("Listening for Metric API: {}/metrics", listening);
}
// Admin HTTP API service.
{
let address = conf.query.admin_api_address.clone();
let mut srv = HttpService::create(session_manager.clone());
let listening = srv.start(address.parse()?).await?;
shutdown_handle.add_service(srv);
info!("Listening for Admin HTTP API: {}", listening);
}
// RPC API service.
{
let address = conf.query.flight_api_address.clone();
let mut srv = RpcService::create(session_manager.clone());
let listening = srv.start(address.parse()?).await?;
shutdown_handle.add_service(srv);
info!("Listening for RPC API (interserver): {}", listening);
}
Finally, this query node is registered in meta server.
// Cluster register.
{
let cluster_discovery = session_manager.get_cluster_discovery();
let register_to_metastore = cluster_discovery.register_to_metastore(&conf);
register_to_metastore.await?;
}
About sessions
There are four parts in session management:
session_manager: Globally unique, manages client sessions
session: Every time a new client connects to the server, a session is created and registered to the session_ manager
query_ctx: Each query creates a query_ctx, which is used to store the context information
query_ctx_shared: Context information shared by subqueries
Let's look at them one by one.
SessionManager (query/src/sessions/session_mgr.rs)
pub struct SessionManager {
pub(in crate::sessions) conf: Config,
pub(in crate::sessions) max_sessions: usize,
pub(in crate::sessions) active_sessions: Arc<RwLock<HashMap<String, Arc<Session>>>>,
pub status: Arc<RwLock<SessionManagerStatus>>,
// When session type is MySQL, insert into this map, key is id, val is MySQL connection id.
pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
pub(in crate::sessions) mysql_basic_conn_id: AtomicU32,
}
“SessionManager” is mainly used to create and destroy sessions, the specific methods are as follows:
// Create a session according to the client protocol
pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<SessionRef>
// Destroy a session by session_ids
pub fn destroy_session(self: &Arc<Self>, session_id: &String)
Session (query/src/sessions/session.rs)
Context information of the client server stored in the session. No more detail will be described as the code logic is already clear.
pub struct Session {
pub(in crate::sessions) id: String,
pub(in crate::sessions) typ: RwLock<SessionType>,
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
status: Arc<RwLock<SessionStatus>>,
pub(in crate::sessions) mysql_connection_id: Option<u32>,
}
pub struct SessionContext {
conf: Config,
abort: AtomicBool,
current_catalog: RwLock<String>,
current_database: RwLock<String>,
current_tenant: RwLock<String>,
current_user: RwLock<Option<UserInfo>>,
auth_role: RwLock<Option<String>>,
client_host: RwLock<Option<SocketAddr>>,
io_shutdown_tx: RwLock<Option<Sender<Sender<()>>>>,
query_context_shared: RwLock<Option<Arc<QueryContextShared>>>,
}
pub struct SessionStatus {
pub session_started_at: Instant,
pub last_query_finished_at: Option<Instant>,
}
Another major function of “Session” is to create and obtain “QueryContext” s. Each time a query request is received, a “QueryContext” is created and bound to the corresponding query statement.
QueryContext (query/src/sessions/query_ctx.rs)
QueryContexts are used to maintain context information of certain queries . They're created by “QueryContext::create_from_shared(query_ctx_shared)”
#[derive(Clone)]
pub struct QueryContext {
version: String,
statistics: Arc<RwLock<Statistics>>,
partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
shared: Arc<QueryContextShared>,
precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
fragment_id: Arc<AtomicUsize>,
}
Among the members, “partition_queue” stores the corresponding PartInfo, including the address and version of the part, rows of data involved, the compression algorithm used, and the meta information related to column. Partition is set when building the pipeline. There will be subsequent articles on pipeline. Metadata that has been written to the storage by the temporary insert operation but has not been submitted is stored in “precommit_blocks” . “DataBlock” contains the meta information reference of column and information of arrow schema.
QueryContextShared (query/src/sessions/query_ctx_shared.rs)
For queries containing subqueries, much context information needs to be shared. This is why we need “QueryContextShared”.
/// It is important that data is shared among query context, for example:
/// USE database_1;
/// SELECT
/// (SELECT scalar FROM table_name_1) AS scalar_1,
/// (SELECT scalar FROM table_name_2) AS scalar_2,
/// (SELECT scalar FROM table_name_3) AS scalar_3
/// FROM table_name_4;
/// runtime, session, progress, init_query_id are shared among the subqueries
pub struct QueryContextShared {
/// scan_progress for scan metrics of datablocks (uncompressed)
pub(in crate::sessions) scan_progress: Arc<Progress>,
/// write_progress for write/commit metrics of datablocks (uncompressed)
pub(in crate::sessions) write_progress: Arc<Progress>,
/// result_progress for metrics of result datablocks (uncompressed)
pub(in crate::sessions) result_progress: Arc<Progress>,
pub(in crate::sessions) error: Arc<Mutex<Option<ErrorCode>>>,
pub(in crate::sessions) session: Arc<Session>,
pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>,
pub(in crate::sessions) init_query_id: Arc<RwLock<String>>,
...
}
It provides all the basic information required by the query context.
Handler
As mentioned earlier, Databend supports multiple handlers. Let's take MySQL handlers as an example to see the processing procedure of handlers and how they interact with sessions. First, a reference to “SessionManager” is contained in the “MySQLHandler” .
pub struct MySQLHandler {
abort_handle: AbortHandle,
abort_registration: Option<AbortRegistration>,
join_handle: Option<JoinHandle<()>>,
}
After it's started, the “MySQLHandler” spawns a tokio task to continuously listen to TCP stream, create a session, and then start a task to execute the following query requests.
fn accept_socket(session_mgr: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
executor.spawn(async move {
// create a session
match session_mgr.create_session(SessionType::MySQL).await {
Err(error) => Self::reject_session(socket, error).await,
Ok(session) => {
info!("MySQL connection coming: {:?}", socket.peer_addr());
// execut queries
if let Err(error) = MySQLConnection::run_on_stream(session, socket) {
error!("Unexpected error occurred during query: {:?}", error);
};
}
}
});
}
In the function of “MySQLConnection::run_on_stream”, the session first attaches to the corresponding client host and registers a shutdown closure to handle related cleanups when the connection is closed.
Related code is as follows:
// mysql_session.rs
pub fn run_on_stream(session: SessionRef, stream: TcpStream) -> Result<()> {
let blocking_stream = Self::convert_stream(stream)?;
MySQLConnection::attach_session(&session, &blocking_stream)?;
...
}
fn attach_session(session: &SessionRef, blocking_stream: &std::net::TcpStream) -> Result<()> {
let host = blocking_stream.peer_addr().ok();
let blocking_stream_ref = blocking_stream.try_clone()?;
session.attach(host, move || {
// register shutdown
if let Err(error) = blocking_stream_ref.shutdown(Shutdown::Both) {
error!("Cannot shutdown MySQL session io {}", error);
}
});
Ok(())
}
// session.rs
pub fn attach<F>(self: &Arc<Self>, host: Option<SocketAddr>, io_shutdown: F)
where F: FnOnce() + Send + 'static {
let (tx, rx) = oneshot::channel();
self.session_ctx.set_client_host(host);
self.session_ctx.set_io_shutdown_tx(Some(tx));
common_base::base::tokio::spawn(async move {
// trigger cleanups when the session quits
if let Ok(tx) = rx.await {
(io_shutdown)();
tx.send(()).ok();
}
});
}
Then a MySQL InteractiveWorker is started to handle subsequent queries.
let join_handle = query_executor.spawn(async move {
let client_addr = non_blocking_stream.peer_addr().unwrap().to_string();
let interactive_worker = InteractiveWorker::create(session, client_addr);
let opts = IntermediaryOptions {
process_use_statement_on_query: true,
};
let (r, w) = non_blocking_stream.into_split();
let w = BufWriter::with_capacity(DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE, w);
AsyncMysqlIntermediary::run_with_options(interactive_worker, r, w, &opts).await
});
let _ = futures::executor::block_on(join_handle);
“InteractiveWorker” implements AsyncMysqlShim trait methods, such as “on_execute”, “on_query” and so on. When a query arrives, these methods are called to make executions.Take “on_query”
for example, the core code is as follows:
async fn on_query<'a>(
&'a mut self,
query: &'a str,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
...
// response writer
let mut writer = DFQueryResultWriter::create(writer);
let instant = Instant::now();
// execute queries
let blocks = self.base.do_query(query).await;
// write results
let format = self.base.session.get_format_settings()?;
let mut write_result = writer.write(blocks, &format);
...
// metrics info
histogram!(
super::mysql_metrics::METRIC_MYSQL_PROCESSOR_REQUEST_DURATION,
instant.elapsed()
);
write_result
}
In “do_query”, a “QueryContext ” is created and the subsequent SQL queries are executed with the parsed SQL process. Related code is as follows:
// create a QueryContext
let context = self.session.create_query_context().await?;
// attach it to the query statement
context.attach_query_str(query);
let settings = context.get_settings();
// parse sql
let stmts_hints = DfParser::parse_sql(query, context.get_current_session().get_type());
...
// Define and generate a query plan
let mut planner = Planner::new(context.clone());
let interpreter = planner.plan_sql(query).await.and_then(|v| {
has_result_set = has_result_set_by_plan(&v.0);
InterpreterFactoryV2::get(context.clone(), &v.0)
})
// Execute queries and return the results
Self::exec_query(interpreter.clone(), &context).await?;
let schema = interpreter.schema();
Ok(QueryResult::create(
blocks,
extra_info,
has_result_set,
schema,
))
Epilogue
The whole process from starting Databend to accepting SQL requests and starting processing is described in this article. Recently, we removed the Clickhouse native TCP client for some reasons (The Clickhouse TCP protocol is biased towards the underlying protocol of Clickhouse. With heavy historical burdens and no public documentation, the debugging process became too exhausting. See more in: https://github.com/datafuselabs/databend/pull/7012) .
Please feel free to discuss your good ideas here with us. In addition, if any relevant problem is found, you can always submit issues to help improve Databend's stability. Databend community welcomes all well-intentioned comments and suggestions :)
About Databend
Databend is an open source modern data warehouse with elasticity and low cost. It can do real-time data analysis on object-based storage.We look forward to your attention and hope to explore the cloud native data warehouse solution, and create a new generation of open source data cloud together.
Databend documentation:https://databend.rs/
Twitter:https://twitter.com/Datafuse_Labs
Slack:https://datafusecloud.slack.com/
Wechat:Databend
Top comments (0)