As a junior computer science student, I experienced a complete transformation from confusion to enlightenment during my journey of learning asynchronous programming. Looking back at my initial bewilderment when I first encountered asynchronous programming, to now being able to skillfully use asynchronous technologies to build high-concurrency systems, this process gave me a deep understanding of the essence and power of asynchronous programming.
Project Information
🚀 Hyperlane Framework: GitHub Repository
📧 Author Contact: root@ltpp.vip
📖 Documentation: Official Docs
My Asynchronous Programming Enlightenment Journey
My asynchronous programming learning began with a performance bottleneck in a course project. At that time, I needed to design an API for the school's library management system, expecting thousands of students to query book information simultaneously. Using traditional synchronous programming models, the system began to show significant delays under just a few hundred concurrent requests.
In my ten years of programming learning experience, this was the first time I truly realized the importance of concurrent programming. Although traditional threading models can handle concurrency, the overhead of thread creation and context switching caused system performance to plummet.
use hyperlane::*;
use hyperlane_macros::*;
use tokio::time::{sleep, Duration, Instant};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
// Asynchronous task manager
struct AsyncTaskManager {
active_tasks: Arc<AtomicU64>,
completed_tasks: Arc<AtomicU64>,
failed_tasks: Arc<AtomicU64>,
total_processing_time: Arc<AtomicU64>,
}
impl AsyncTaskManager {
fn new() -> Self {
Self {
active_tasks: Arc::new(AtomicU64::new(0)),
completed_tasks: Arc::new(AtomicU64::new(0)),
failed_tasks: Arc::new(AtomicU64::new(0)),
total_processing_time: Arc::new(AtomicU64::new(0)),
}
}
async fn execute_task<F, T>(&self, task: F) -> Result<T, String>
where
F: std::future::Future<Output = Result<T, String>>,
{
let start_time = Instant::now();
self.active_tasks.fetch_add(1, Ordering::Relaxed);
let result = task.await;
let duration = start_time.elapsed();
self.total_processing_time.fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
self.active_tasks.fetch_sub(1, Ordering::Relaxed);
match result {
Ok(value) => {
self.completed_tasks.fetch_add(1, Ordering::Relaxed);
Ok(value)
}
Err(error) => {
self.failed_tasks.fetch_add(1, Ordering::Relaxed);
Err(error)
}
}
}
async fn get_statistics(&self) -> TaskStatistics {
TaskStatistics {
active_tasks: self.active_tasks.load(Ordering::Relaxed),
completed_tasks: self.completed_tasks.load(Ordering::Relaxed),
failed_tasks: self.failed_tasks.load(Ordering::Relaxed),
average_processing_time: {
let total_tasks = self.completed_tasks.load(Ordering::Relaxed) + self.failed_tasks.load(Ordering::Relaxed);
if total_tasks > 0 {
self.total_processing_time.load(Ordering::Relaxed) / total_tasks
} else {
0
}
},
}
}
}
#[derive(serde::Serialize)]
struct TaskStatistics {
active_tasks: u64,
completed_tasks: u64,
failed_tasks: u64,
average_processing_time: u64,
}
static TASK_MANAGER: once_cell::sync::Lazy<AsyncTaskManager> =
once_cell::sync::Lazy::new(|| AsyncTaskManager::new());
// Simulate complex asynchronous database queries
async fn simulate_database_query(query_id: u64, complexity: u64) -> Result<QueryResult, String> {
// Simulate different complexity query delays
let delay_ms = match complexity {
1..=3 => 10 + (complexity * 5),
4..=7 => 50 + (complexity * 10),
8..=10 => 100 + (complexity * 20),
_ => 500,
};
sleep(Duration::from_millis(delay_ms)).await;
// Simulate occasional query failures
if rand::random::<f64>() < 0.05 {
return Err(format!("Database query {} failed", query_id));
}
Ok(QueryResult {
query_id,
complexity,
execution_time_ms: delay_ms,
result_count: (complexity * 100) as usize,
data: generate_mock_data(complexity as usize),
})
}
#[derive(serde::Serialize)]
struct QueryResult {
query_id: u64,
complexity: u64,
execution_time_ms: u64,
result_count: usize,
data: Vec<MockDataItem>,
}
#[derive(serde::Serialize)]
struct MockDataItem {
id: usize,
name: String,
value: f64,
timestamp: u64,
}
fn generate_mock_data(count: usize) -> Vec<MockDataItem> {
(0..count).map(|i| MockDataItem {
id: i,
name: format!("item_{}", i),
value: rand::random::<f64>() * 1000.0,
timestamp: chrono::Utc::now().timestamp() as u64,
}).collect()
}
#[post]
async fn async_batch_query_endpoint(ctx: Context) {
let request_body: Vec<u8> = ctx.get_request_body().await;
let batch_request: BatchQueryRequest = serde_json::from_slice(&request_body).unwrap();
let start_time = Instant::now();
// Execute all queries concurrently
let query_tasks: Vec<_> = batch_request.queries.into_iter().map(|query| {
let task_manager = &TASK_MANAGER;
async move {
task_manager.execute_task(simulate_database_query(query.id, query.complexity)).await
}
}).collect();
// Wait for all queries to complete
let results: Vec<_> = futures::future::join_all(query_tasks).await;
let total_duration = start_time.elapsed();
let statistics = TASK_MANAGER.get_statistics().await;
let response = BatchQueryResponse {
total_queries: results.len(),
successful_queries: results.iter().filter(|r| r.is_ok()).count(),
failed_queries: results.iter().filter(|r| r.is_err()).count(),
total_execution_time_ms: total_duration.as_millis() as u64,
results: results.into_iter().map(|r| match r {
Ok(result) => QueryResponseItem::Success(result),
Err(error) => QueryResponseItem::Error { error },
}).collect(),
task_statistics: statistics,
};
ctx.set_response_status_code(200)
.await
.set_response_header(CONTENT_TYPE, APPLICATION_JSON)
.await
.set_response_body(serde_json::to_string(&response).unwrap())
.await;
}
#[derive(serde::Deserialize)]
struct BatchQueryRequest {
queries: Vec<QueryRequest>,
}
#[derive(serde::Deserialize)]
struct QueryRequest {
id: u64,
complexity: u64,
}
#[derive(serde::Serialize)]
struct BatchQueryResponse {
total_queries: usize,
successful_queries: usize,
failed_queries: usize,
total_execution_time_ms: u64,
results: Vec<QueryResponseItem>,
task_statistics: TaskStatistics,
}
#[derive(serde::Serialize)]
#[serde(tag = "status")]
enum QueryResponseItem {
Success(QueryResult),
Error { error: String },
}
Deep Practice of Asynchronous Stream Processing
In my learning process, I found that asynchronous stream processing is a key technology for handling large amounts of data. Through stream processing, we can process data immediately as it arrives, without waiting for all data to be ready.
use hyperlane::*;
use hyperlane_macros::*;
use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context as TaskContext, Poll};
// Custom asynchronous data stream
struct DataStream {
current: usize,
max_items: usize,
delay_ms: u64,
}
impl DataStream {
fn new(max_items: usize, delay_ms: u64) -> Self {
Self {
current: 0,
max_items,
delay_ms,
}
}
}
impl Stream for DataStream {
type Item = StreamDataItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
if self.current >= self.max_items {
return Poll::Ready(None);
}
let current = self.current;
self.current += 1;
// Simulate asynchronous data generation
let waker = cx.waker().clone();
let delay_ms = self.delay_ms;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
waker.wake();
});
Poll::Ready(Some(StreamDataItem {
id: current,
data: format!("stream_data_{}", current),
timestamp: chrono::Utc::now().timestamp(),
processing_order: current,
}))
}
}
#[derive(serde::Serialize, Clone)]
struct StreamDataItem {
id: usize,
data: String,
timestamp: i64,
processing_order: usize,
}
// Asynchronous stream processor
struct StreamProcessor {
buffer_size: usize,
processing_delay_ms: u64,
}
impl StreamProcessor {
fn new(buffer_size: usize, processing_delay_ms: u64) -> Self {
Self {
buffer_size,
processing_delay_ms,
}
}
async fn process_stream<S>(&self, mut stream: S) -> Vec<ProcessedStreamItem>
where
S: Stream<Item = StreamDataItem> + Unpin,
{
let mut results = Vec::new();
let mut buffer = Vec::new();
while let Some(item) = stream.next().await {
buffer.push(item);
if buffer.len() >= self.buffer_size {
let batch_results = self.process_batch(buffer.clone()).await;
results.extend(batch_results);
buffer.clear();
}
}
// Process remaining data
if !buffer.is_empty() {
let batch_results = self.process_batch(buffer).await;
results.extend(batch_results);
}
results
}
async fn process_batch(&self, items: Vec<StreamDataItem>) -> Vec<ProcessedStreamItem> {
let processing_tasks: Vec<_> = items.into_iter().map(|item| {
let delay_ms = self.processing_delay_ms;
async move {
// Simulate data processing delay
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
ProcessedStreamItem {
original_id: item.id,
processed_data: format!("processed_{}", item.data),
original_timestamp: item.timestamp,
processed_timestamp: chrono::Utc::now().timestamp(),
processing_duration_ms: delay_ms,
}
}
}).collect();
futures::future::join_all(processing_tasks).await
}
}
#[derive(serde::Serialize)]
struct ProcessedStreamItem {
original_id: usize,
processed_data: String,
original_timestamp: i64,
processed_timestamp: i64,
processing_duration_ms: u64,
}
#[get]
async fn stream_processing_endpoint(ctx: Context) {
let query_params = ctx.get_query_params().await;
let max_items = query_params.get("max_items")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(100);
let stream_delay = query_params.get("stream_delay")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(10);
let processing_delay = query_params.get("processing_delay")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(5);
let buffer_size = query_params.get("buffer_size")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(10);
let start_time = Instant::now();
// Create data stream
let data_stream = DataStream::new(max_items, stream_delay);
// Create stream processor
let processor = StreamProcessor::new(buffer_size, processing_delay);
// Process stream data
let processed_items = processor.process_stream(data_stream).await;
let total_duration = start_time.elapsed();
let response = StreamProcessingResponse {
total_items: processed_items.len(),
buffer_size,
stream_delay_ms: stream_delay,
processing_delay_ms: processing_delay,
total_duration_ms: total_duration.as_millis() as u64,
throughput_items_per_second: if total_duration.as_secs() > 0 {
processed_items.len() as f64 / total_duration.as_secs_f64()
} else {
0.0
},
processed_items,
};
ctx.set_response_status_code(200)
.await
.set_response_header(CONTENT_TYPE, APPLICATION_JSON)
.await
.set_response_body(serde_json::to_string(&response).unwrap())
.await;
}
#[derive(serde::Serialize)]
struct StreamProcessingResponse {
total_items: usize,
buffer_size: usize,
stream_delay_ms: u64,
processing_delay_ms: u64,
total_duration_ms: u64,
throughput_items_per_second: f64,
processed_items: Vec<ProcessedStreamItem>,
}
Asynchronous Error Handling and Recovery Mechanisms
In my practice, I found that error handling in asynchronous programming is more complex than synchronous programming. We need to consider task failures, timeouts, resource competition, and other situations.
use hyperlane::*;
use hyperlane_macros::*;
use tokio::time::{timeout, Duration};
use std::sync::Arc;
// Asynchronous retry mechanism
struct AsyncRetryPolicy {
max_attempts: usize,
base_delay_ms: u64,
max_delay_ms: u64,
backoff_multiplier: f64,
}
impl AsyncRetryPolicy {
fn new(max_attempts: usize, base_delay_ms: u64, max_delay_ms: u64, backoff_multiplier: f64) -> Self {
Self {
max_attempts,
base_delay_ms,
max_delay_ms,
backoff_multiplier,
}
}
async fn execute_with_retry<F, T, E>(&self, mut operation: F) -> Result<T, RetryError<E>>
where
F: FnMut() -> Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
E: std::fmt::Debug + Clone,
{
let mut attempts = 0;
let mut last_error = None;
while attempts < self.max_attempts {
attempts += 1;
match operation().await {
Ok(result) => return Ok(result),
Err(error) => {
last_error = Some(error.clone());
if attempts < self.max_attempts {
let delay = self.calculate_delay(attempts);
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
}
}
Err(RetryError {
attempts,
last_error: last_error.unwrap(),
})
}
fn calculate_delay(&self, attempt: usize) -> u64 {
let delay = self.base_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32 - 1);
(delay as u64).min(self.max_delay_ms)
}
}
#[derive(Debug)]
struct RetryError<E> {
attempts: usize,
last_error: E,
}
// Asynchronous timeout handling
struct AsyncTimeoutManager {
default_timeout_ms: u64,
}
impl AsyncTimeoutManager {
fn new(default_timeout_ms: u64) -> Self {
Self { default_timeout_ms }
}
async fn execute_with_timeout<F, T>(&self, operation: F, timeout_ms: Option<u64>) -> Result<T, TimeoutError>
where
F: std::future::Future<Output = T>,
{
let timeout_duration = Duration::from_millis(timeout_ms.unwrap_or(self.default_timeout_ms));
match timeout(timeout_duration, operation).await {
Ok(result) => Ok(result),
Err(_) => Err(TimeoutError {
timeout_ms: timeout_duration.as_millis() as u64,
}),
}
}
}
#[derive(Debug)]
struct TimeoutError {
timeout_ms: u64,
}
// Simulate unstable asynchronous service
async fn unreliable_service_call(service_id: u32, fail_rate: f64) -> Result<ServiceResponse, ServiceError> {
// Simulate network delay
let delay = rand::random::<u64>() % 200 + 50;
tokio::time::sleep(Duration::from_millis(delay)).await;
// Simulate service failure
if rand::random::<f64>() < fail_rate {
return Err(ServiceError {
service_id,
error_code: rand::random::<u32>() % 1000 + 500,
message: "Service temporarily unavailable".to_string(),
});
}
Ok(ServiceResponse {
service_id,
data: format!("Response from service {}", service_id),
response_time_ms: delay,
timestamp: chrono::Utc::now().timestamp(),
})
}
#[derive(serde::Serialize)]
struct ServiceResponse {
service_id: u32,
data: String,
response_time_ms: u64,
timestamp: i64,
}
#[derive(Debug, Clone)]
struct ServiceError {
service_id: u32,
error_code: u32,
message: String,
}
static RETRY_POLICY: once_cell::sync::Lazy<AsyncRetryPolicy> =
once_cell::sync::Lazy::new(|| AsyncRetryPolicy::new(3, 100, 5000, 2.0));
static TIMEOUT_MANAGER: once_cell::sync::Lazy<AsyncTimeoutManager> =
once_cell::sync::Lazy::new(|| AsyncTimeoutManager::new(5000));
#[post]
async fn resilient_service_call_endpoint(ctx: Context) {
let request_body: Vec<u8> = ctx.get_request_body().await;
let service_request: ResilientServiceRequest = serde_json::from_slice(&request_body).unwrap();
let start_time = Instant::now();
let mut results = Vec::new();
for service_call in service_request.service_calls {
let service_id = service_call.service_id;
let fail_rate = service_call.fail_rate;
let timeout_ms = service_call.timeout_ms;
let call_start = Instant::now();
// Use retry mechanism and timeout control
let result = RETRY_POLICY.execute_with_retry(|| {
Box::pin(async move {
TIMEOUT_MANAGER.execute_with_timeout(
unreliable_service_call(service_id, fail_rate),
timeout_ms
).await.map_err(|_| ServiceError {
service_id,
error_code: 408,
message: "Request timeout".to_string(),
})
})
}).await;
let call_duration = call_start.elapsed();
let result_item = match result {
Ok(response) => ResilientCallResult::Success {
response,
total_duration_ms: call_duration.as_millis() as u64,
retry_attempts: 1,
},
Err(retry_error) => ResilientCallResult::Failed {
error: retry_error.last_error,
total_duration_ms: call_duration.as_millis() as u64,
retry_attempts: retry_error.attempts,
},
};
results.push(result_item);
}
let total_duration = start_time.elapsed();
let response = ResilientServiceResponse {
total_calls: results.len(),
successful_calls: results.iter().filter(|r| matches!(r, ResilientCallResult::Success { .. })).count(),
failed_calls: results.iter().filter(|r| matches!(r, ResilientCallResult::Failed { .. })).count(),
total_duration_ms: total_duration.as_millis() as u64,
results,
};
ctx.set_response_status_code(200)
.await
.set_response_header(CONTENT_TYPE, APPLICATION_JSON)
.await
.set_response_body(serde_json::to_string(&response).unwrap())
.await;
}
#[derive(serde::Deserialize)]
struct ResilientServiceRequest {
service_calls: Vec<ServiceCallConfig>,
}
#[derive(serde::Deserialize)]
struct ServiceCallConfig {
service_id: u32,
fail_rate: f64,
timeout_ms: Option<u64>,
}
#[derive(serde::Serialize)]
struct ResilientServiceResponse {
total_calls: usize,
successful_calls: usize,
failed_calls: usize,
total_duration_ms: u64,
results: Vec<ResilientCallResult>,
}
#[derive(serde::Serialize)]
#[serde(tag = "status")]
enum ResilientCallResult {
Success {
response: ServiceResponse,
total_duration_ms: u64,
retry_attempts: usize,
},
Failed {
error: ServiceError,
total_duration_ms: u64,
retry_attempts: usize,
},
}
Summary and Reflection
Through this deep exploration of asynchronous programming, I not only mastered the core technologies of asynchronous development, but more importantly, I developed an asynchronous thinking mindset. In my future career, these experiences will become my important assets.
Asynchronous programming is not just a technical skill, but a way of thinking about concurrent systems. It requires us to think about data flow, error handling, resource management, and performance optimization from a completely different perspective.
I believe that as technology continues to evolve, asynchronous programming will become an essential skill for all developers, and this framework provides a perfect learning platform for developers.
This article records my deep learning and practice of asynchronous programming as a junior student. Through actual code examples and project experience, I deeply experienced the importance and power of asynchronous programming in modern Web development. I hope my experience can provide some reference for other students.
For more information, please visit Hyperlane GitHub page or contact the author: root@ltpp.vip
Top comments (0)