DEV Community

tengxgfyrz67s
tengxgfyrz67s

Posted on

Stream-and-Connection-Management

Stream and Connection Management

Project Code:https://github.com/hyperlane-dev/hyperlane

Hyperlane's Stream type is the backbone of its connection management system. It represents a TCP connection between the server and a client, providing methods for reading requests, sending data, managing keep-alive connections, and gracefully closing connections. This article explores how hyperlane manages streams and connections at a low level.

Understanding the Stream Type

In hyperlane, every client connection is represented by a Stream object. This object is passed to your middleware and route handlers, giving you direct control over the connection lifecycle. The Stream type provides methods for:

  • Sending data to the client
  • Receiving HTTP requests from the client
  • Managing connection state (open/closed, keep-alive)
  • Flushing buffered data

Sending Data

Hyperlane provides several methods for sending data through a stream:

let data = ctx.get_mut_response().build();
stream.try_send(data).await;
stream.send(data).await;
stream.try_send_list(&frame_list).await;
stream.try_flush().await;
Enter fullscreen mode Exit fullscreen mode

try_send

try_send attempts to send data through the stream. If the stream is closed or encounters an error, it returns an error rather than panicking:

let data = ctx.get_mut_response().build();
stream.try_send(data).await;
Enter fullscreen mode Exit fullscreen mode

This is the preferred method for most scenarios because it allows you to handle errors gracefully.

send

send sends data through the stream, blocking until the operation completes. Use this when you need to ensure data is sent before proceeding:

stream.send(data).await;
Enter fullscreen mode Exit fullscreen mode

try_send_list

try_send_list sends multiple frames of data in a single call, which can be more efficient than sending each frame individually:

stream.try_send_list(&frame_list).await;
Enter fullscreen mode Exit fullscreen mode

This is particularly useful for WebSocket communication where you may need to send multiple frames at once.

try_flush

try_flush flushes any buffered data in the stream, ensuring that all previously sent data is actually written to the network:

stream.try_flush().await;
Enter fullscreen mode Exit fullscreen mode

Attribute Macros for Sending

Hyperlane provides attribute macros that automate common send operations:

#[try_send]
#[send]
#[try_flush]
#[flush]
#[closed]
Enter fullscreen mode Exit fullscreen mode

When applied to a handler method, these macros automatically build the response and send it:

#[try_send]
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
    // The response is automatically built and sent via try_send
    Status::Continue
}
Enter fullscreen mode Exit fullscreen mode

The #[closed] macro closes the stream after sending:

#[closed]
async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
    // The response is sent and the stream is closed
    Status::Continue
}
Enter fullscreen mode Exit fullscreen mode

Connection State Management

Closing Connections

You can explicitly close a connection by calling set_closed(true) on the stream:

stream.set_closed(true);
Enter fullscreen mode Exit fullscreen mode

This is useful when you want to terminate a connection after sending a response — for example, when an error occurs or when keep-alive is not desired.

Checking Connection State

The is_keep_alive method determines whether the connection should be kept alive based on the client's request headers and the server's configuration:

let keep_alive = stream.is_keep_alive(ctx.get_request().is_enable_keep_alive());
Enter fullscreen mode Exit fullscreen mode

This method checks both the Connection header from the client and whether keep-alive is enabled in the request configuration.

Keep-Alive Connection Management

HTTP keep-alive allows multiple requests to be sent over the same TCP connection, significantly reducing latency and improving performance. Hyperlane provides built-in support for managing keep-alive connections:

stream.set_closed(true);
let keep_alive = stream.is_keep_alive(ctx.get_request().is_enable_keep_alive());
while stream.try_get_http_request().await.is_ok() {
    if !ctx.get_request().is_enable_keep_alive() {
        stream.set_closed(true);
        break;
    }
}
Enter fullscreen mode Exit fullscreen mode

This pattern is the core of hyperlane's keep-alive implementation:

  1. stream.is_keep_alive(...) — Checks whether the connection should be kept alive.
  2. stream.try_get_http_request().await.is_ok() — Attempts to read the next HTTP request from the stream. If successful, another request is available.
  3. ctx.get_request().is_enable_keep_alive() — Checks if the current request has keep-alive enabled.
  4. stream.set_closed(true) — Closes the connection if keep-alive is not enabled.

The while loop continues processing requests on the same connection as long as:

  • There are more requests available (try_get_http_request succeeds)
  • Keep-alive is enabled for each request

When either condition fails, the connection is closed.

Request Configuration for Connection Management

The RequestConfig type allows you to configure connection-level behavior:

let request_config_json = r#"{ "buffer_size": 8192, "max_path_size": 8192, "max_header_count": 100, "max_header_key_size": 8192, "max_header_value_size": 8192, "max_body_size": 2097152, "read_timeout_ms": 6000 }"#;
let request_config = RequestConfig::from_json(request_config_json).unwrap();
Enter fullscreen mode Exit fullscreen mode

Key parameters for connection management:

  • buffer_size — The size of the read buffer. Larger buffers can improve throughput for large requests.
  • read_timeout_ms — The timeout for reading data from the connection. This prevents idle connections from consuming resources indefinitely.

Server Configuration for Connection Management

ServerConfig controls server-level connection behavior:

let mut config: ServerConfig = ServerConfig::default();
config.set_address("0.0.0.0:80");
config.set_nodelay(Some(true));
config.set_ttl(Some(128));
Enter fullscreen mode Exit fullscreen mode
  • set_nodelay(Some(true)) — Enables TCP_NODELAY, which disables Nagle's algorithm. This reduces latency by sending data immediately rather than buffering it.
  • set_ttl(Some(128)) — Sets the IP Time-To-Live field, which controls how many hops a packet can traverse.

WebSocket Connection Management

WebSocket connections require special handling because they start as HTTP requests and then get upgraded to persistent WebSocket connections:

#[route("/ws_upgrade_type")]
struct Websocket;
impl ServerHook for Websocket {
    async fn new(_: &mut Stream, _: &mut Context) -> Self { Self }

    #[is_ws_upgrade_type]
    #[try_get_websocket_request(body)]
    async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
        let body_list = WebSocketFrame::create_frame_list(&body);
        stream.send_list(body_list).await;
        Status::Continue
    }
}
Enter fullscreen mode Exit fullscreen mode

Key aspects of WebSocket connection management:

  • #[is_ws_upgrade_type] — Checks if the request is a WebSocket upgrade request.
  • #[try_get_websocket_request(body)] — Extracts the WebSocket request body.
  • WebSocketFrame::create_frame_list(&body) — Creates WebSocket frames from the body data.
  • stream.send_list(body_list).await — Sends all frames in a single call.

SSE (Server-Sent Events) Connection Management

SSE is another persistent connection pattern supported by hyperlane:

let data = ctx.get_mut_response()
    .set_header(CONTENT_TYPE, TEXT_EVENT_STREAM)
    .set_body(Vec::new())
    .build();
stream.try_send(data).await;
for i in 0..10 {
    let body = format!("data:{i}{HTTP_DOUBLE_BR}");
    stream.try_send(&body).await;
}
Enter fullscreen mode Exit fullscreen mode

SSE connections:

  1. Start with a standard HTTP response with Content-Type: text/event-stream.
  2. Send events as formatted text data over the same connection.
  3. Use stream.try_send to push events to the client incrementally.

Error Handling in Connection Management

Proper error handling is essential for robust connection management:

impl ServerHook for AuthMiddleware {
    async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
        let auth_str = ctx.get_request().try_get_header_back(AUTHORIZATION).unwrap_or_default();
        if auth_str.is_empty() {
            let data = ctx.get_mut_response().set_status_code(401).set_body("Unauthorized").build();
            if stream.try_send(data).await.is_err() {
                stream.set_closed(true);
            }
            return Status::Reject;
        }
        Status::Continue
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice the pattern: after sending an error response, we check if try_send failed and close the stream if it did. This prevents trying to write to a broken connection.

Request Error Handling

Hyperlane provides a dedicated hook for handling request parsing errors:

struct RequestErrorHook;
impl ServerHook for RequestErrorHook {
    async fn new(_: &mut Stream, ctx: &mut Context) -> Self {
        let request_error = ctx.try_get_request_error_data().unwrap_or_default();
        Self
    }
    async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
        let data = ctx.get_mut_response().set_status_code(500).set_body("error").build();
        stream.try_send(data).await;
        Status::Continue
    }
}
server.request_error::<RequestErrorHook>();
Enter fullscreen mode Exit fullscreen mode

This hook is invoked when a request cannot be properly parsed. The ctx.try_get_request_error_data() method provides access to details about the parsing error.

Task Panic Handling

For robustness, hyperlane also provides a panic handler:

struct TaskPanicHook;
impl ServerHook for TaskPanicHook {
    async fn new(_: &mut Stream, ctx: &mut Context) -> Self {
        let error = ctx.try_get_task_panic_data().unwrap_or_default();
        Self
    }
    async fn handle(self, stream: &mut Stream, ctx: &mut Context) -> Status {
        let data = ctx.get_mut_response().set_status_code(500).set_body("panic").build();
        stream.try_send(data).await;
        Status::Continue
    }
}
server.task_panic::<TaskPanicHook>();
Enter fullscreen mode Exit fullscreen mode

This ensures that even if a task panics, the server can send an appropriate error response and continue operating.

Performance Considerations

Hyperlane's connection management is designed for high performance:

  • Without Keep-Alive: hyperlane QPS 51031, Tokio 49555, Rocket 49345, Gin 40149
  • With Keep-Alive: Tokio 340130, hyperlane 334888, Rocket 298945, Gin 242570
  • ab test (1 million requests): hyperlane 316211 QPS (Keep-Alive), Tokio 308596

To maximize performance:

  1. Enable keep-alive when clients make multiple requests.
  2. Set appropriate buffer sizes based on your expected request/response sizes.
  3. Use try_send over send to avoid blocking on slow connections.
  4. Set TCP_NODELAY for latency-sensitive applications.
  5. Configure read timeouts to prevent resource exhaustion from idle connections.

Conclusion

Hyperlane's Stream type provides a comprehensive set of tools for managing TCP connections. From basic data sending to keep-alive management, WebSocket upgrades, and error handling, the Stream API gives you fine-grained control over every aspect of connection management. By understanding and leveraging these APIs, you can build robust, high-performance web applications that handle connections efficiently and gracefully.


Project Code:https://github.com/hyperlane-dev/hyperlane

Top comments (0)