Stream and Connection Management
Project Code:https://github.com/hyperlane-dev/hyperlane
Hyperlane's Stream type is the backbone of its connection management system. It represents a TCP connection between the server and a client, providing methods for reading requests, sending data, managing keep-alive connections, and gracefully closing connections. This article explores how hyperlane manages streams and connections at a low level.
Understanding the Stream Type
In hyperlane, every client connection is represented by a Stream object. This object is passed to your middleware and route handlers, giving you direct control over the connection lifecycle. The Stream type provides methods for:
- Sending data to the client
- Receiving HTTP requests from the client
- Managing connection state (open/closed, keep-alive)
- Flushing buffered data
Sending Data
Hyperlane provides several methods for sending data through a stream:
let data = ctx.get_mut_response().build();
stream.try_send(data).await;
stream.send(data).await;
stream.try_send_list(&frame_list).await;
stream.try_flush().await;
try_send
try_send attempts to send data through the stream. If the stream is closed or encounters an error, it returns an error rather than panicking:
let data = ctx.get_mut_response().build();
stream.try_send(data).await;
This is the preferred method for most scenarios because it allows you to handle errors gracefully.
send
send sends data through the stream, blocking until the operation completes. Use this when you need to ensure data is sent before proceeding:
stream.send(data).await;
try_send_list
try_send_list sends multiple frames of data in a single call, which can be more efficient than sending each frame individually:
stream.try_send_list(&frame_list).await;
This is particularly useful for WebSocket communication where you may need to send multiple frames at once.
try_flush
try_flush flushes any buffered data in the stream, ensuring that all previously sent data is actually written to the network:
stream.try_flush().await;
Attribute Macros for Sending
Hyperlane provides attribute macros that automate common send operations:
#[try_send]
#[send]
#[try_flush]
#[flush]
#[closed]
When applied to a handler method, these macros automatically build the response and send it:
#[try_send]
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
// The response is automatically built and sent via try_send
Status::Continue
}
The #[closed] macro closes the stream after sending:
#[closed]
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
// The response is sent and the stream is closed
Status::Continue
}
Connection State Management
Closing Connections
You can explicitly close a connection by calling set_closed(true) on the stream:
stream.set_closed(true);
This is useful when you want to terminate a connection after sending a response — for example, when an error occurs or when keep-alive is not desired.
Checking Connection State
The is_keep_alive method determines whether the connection should be kept alive based on the client's request headers and the server's configuration:
let keep_alive = stream.is_keep_alive(ctx.get_request().is_enable_keep_alive());
This method checks both the Connection header from the client and whether keep-alive is enabled in the request configuration.
Keep-Alive Connection Management
HTTP keep-alive allows multiple requests to be sent over the same TCP connection, significantly reducing latency and improving performance. Hyperlane provides built-in support for managing keep-alive connections:
stream.set_closed(true);
let keep_alive = stream.is_keep_alive(ctx.get_request().is_enable_keep_alive());
while stream.try_get_http_request().await.is_ok() {
if !ctx.get_request().is_enable_keep_alive() {
stream.set_closed(true);
break;
}
}
This pattern is the core of hyperlane's keep-alive implementation:
-
stream.is_keep_alive(...)— Checks whether the connection should be kept alive. -
stream.try_get_http_request().await.is_ok()— Attempts to read the next HTTP request from the stream. If successful, another request is available. -
ctx.get_request().is_enable_keep_alive()— Checks if the current request has keep-alive enabled. -
stream.set_closed(true)— Closes the connection if keep-alive is not enabled.
The while loop continues processing requests on the same connection as long as:
- There are more requests available (
try_get_http_requestsucceeds) - Keep-alive is enabled for each request
When either condition fails, the connection is closed.
Request Configuration for Connection Management
The RequestConfig type allows you to configure connection-level behavior:
let request_config_json = r#"{ "buffer_size": 8192, "max_path_size": 8192, "max_header_count": 100, "max_header_key_size": 8192, "max_header_value_size": 8192, "max_body_size": 2097152, "read_timeout_ms": 6000 }"#;
let request_config = RequestConfig::from_json(request_config_json).unwrap();
Key parameters for connection management:
-
buffer_size— The size of the read buffer. Larger buffers can improve throughput for large requests. -
read_timeout_ms— The timeout for reading data from the connection. This prevents idle connections from consuming resources indefinitely.
Server Configuration for Connection Management
ServerConfig controls server-level connection behavior:
let mut config: ServerConfig = ServerConfig::default();
config.set_address("0.0.0.0:80");
config.set_nodelay(Some(true));
config.set_ttl(Some(128));
-
set_nodelay(Some(true))— EnablesTCP_NODELAY, which disables Nagle's algorithm. This reduces latency by sending data immediately rather than buffering it. -
set_ttl(Some(128))— Sets the IP Time-To-Live field, which controls how many hops a packet can traverse.
WebSocket Connection Management
WebSocket connections require special handling because they start as HTTP requests and then get upgraded to persistent WebSocket connections:
#[route("/ws_upgrade_type")]
struct Websocket;
impl ServerHook for Websocket {
async fn new(_: &mut Stream, _: &mut Context) -> Self { Self }
#[is_ws_upgrade_type]
#[try_get_websocket_request(body)]
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
let body_list = WebSocketFrame::create_frame_list(&body);
stream.send_list(body_list).await;
Status::Continue
}
}
Key aspects of WebSocket connection management:
-
#[is_ws_upgrade_type]— Checks if the request is a WebSocket upgrade request. -
#[try_get_websocket_request(body)]— Extracts the WebSocket request body. -
WebSocketFrame::create_frame_list(&body)— Creates WebSocket frames from the body data. -
stream.send_list(body_list).await— Sends all frames in a single call.
SSE (Server-Sent Events) Connection Management
SSE is another persistent connection pattern supported by hyperlane:
let data = ctx.get_mut_response()
.set_header(CONTENT_TYPE, TEXT_EVENT_STREAM)
.set_body(Vec::new())
.build();
stream.try_send(data).await;
for i in 0..10 {
let body = format!("data:{i}{HTTP_DOUBLE_BR}");
stream.try_send(&body).await;
}
SSE connections:
- Start with a standard HTTP response with
Content-Type: text/event-stream. - Send events as formatted text data over the same connection.
- Use
stream.try_sendto push events to the client incrementally.
Error Handling in Connection Management
Proper error handling is essential for robust connection management:
impl ServerHook for AuthMiddleware {
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
let auth_str = ctx.get_request().try_get_header_back(AUTHORIZATION).unwrap_or_default();
if auth_str.is_empty() {
let data = ctx.get_mut_response().set_status_code(401).set_body("Unauthorized").build();
if stream.try_send(data).await.is_err() {
stream.set_closed(true);
}
return Status::Reject;
}
Status::Continue
}
}
Notice the pattern: after sending an error response, we check if try_send failed and close the stream if it did. This prevents trying to write to a broken connection.
Request Error Handling
Hyperlane provides a dedicated hook for handling request parsing errors:
struct RequestErrorHook;
impl ServerHook for RequestErrorHook {
async fn new(_: &mut Stream, ctx: &mut Context) -> Self {
let request_error = ctx.try_get_request_error_data().unwrap_or_default();
Self
}
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
let data = ctx.get_mut_response().set_status_code(500).set_body("error").build();
stream.try_send(data).await;
Status::Continue
}
}
server.request_error::<RequestErrorHook>();
This hook is invoked when a request cannot be properly parsed. The ctx.try_get_request_error_data() method provides access to details about the parsing error.
Task Panic Handling
For robustness, hyperlane also provides a panic handler:
struct TaskPanicHook;
impl ServerHook for TaskPanicHook {
async fn new(_: &mut Stream, ctx: &mut Context) -> Self {
let error = ctx.try_get_task_panic_data().unwrap_or_default();
Self
}
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
let data = ctx.get_mut_response().set_status_code(500).set_body("panic").build();
stream.try_send(data).await;
Status::Continue
}
}
server.task_panic::<TaskPanicHook>();
This ensures that even if a task panics, the server can send an appropriate error response and continue operating.
Performance Considerations
Hyperlane's connection management is designed for high performance:
- Without Keep-Alive: hyperlane QPS 51031, Tokio 49555, Rocket 49345, Gin 40149
- With Keep-Alive: Tokio 340130, hyperlane 334888, Rocket 298945, Gin 242570
- ab test (1 million requests): hyperlane 316211 QPS (Keep-Alive), Tokio 308596
To maximize performance:
- Enable keep-alive when clients make multiple requests.
- Set appropriate buffer sizes based on your expected request/response sizes.
-
Use
try_sendoversendto avoid blocking on slow connections. -
Set
TCP_NODELAYfor latency-sensitive applications. - Configure read timeouts to prevent resource exhaustion from idle connections.
Conclusion
Hyperlane's Stream type provides a comprehensive set of tools for managing TCP connections. From basic data sending to keep-alive management, WebSocket upgrades, and error handling, the Stream API gives you fine-grained control over every aspect of connection management. By understanding and leveraging these APIs, you can build robust, high-performance web applications that handle connections efficiently and gracefully.
Project Code:https://github.com/hyperlane-dev/hyperlane
Top comments (0)