--- name: rust-async-patterns description: Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code. --- # Rust Async Patterns Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling. ## When to Use This Skill - Building async Rust applications - Implementing concurrent network services - Using Tokio for async I/O - Handling async errors properly - Debugging async code issues - Optimizing async performance ## Core Concepts ### 1. Async Execution Model ``` Future (lazy) → poll() → Ready(value) | Pending ↑ ↓ Waker ← Runtime schedules ``` ### 2. Key Abstractions | Concept | Purpose | |---------|---------| | `Future` | Lazy computation that may complete later | | `async fn` | Function returning impl Future | | `await` | Suspend until future completes | | `Task` | Spawned future running concurrently | | `Runtime` | Executor that polls futures | ## Quick Start ```toml # Cargo.toml [dependencies] tokio = { version = "1", features = ["full"] } futures = "0.3" async-trait = "0.1" anyhow = "1.0" tracing = "0.1" tracing-subscriber = "0.3" ``` ```rust use tokio::time::{sleep, Duration}; use anyhow::Result; #[tokio::main] async fn main() -> Result<()> { // Initialize tracing tracing_subscriber::fmt::init(); // Async operations let result = fetch_data("https://api.example.com").await?; println!("Got: {}", result); Ok(()) } async fn fetch_data(url: &str) -> Result { // Simulated async operation sleep(Duration::from_millis(100)).await; Ok(format!("Data from {}", url)) } ``` ## Patterns ### Pattern 1: Concurrent Task Execution ```rust use tokio::task::JoinSet; use anyhow::Result; // Spawn multiple concurrent tasks async fn fetch_all_concurrent(urls: Vec) -> Result> { let mut set = JoinSet::new(); for url in urls { set.spawn(async move { fetch_data(&url).await }); } let mut results = Vec::new(); while let Some(res) = set.join_next().await { match res { Ok(Ok(data)) => results.push(data), Ok(Err(e)) => tracing::error!("Task failed: {}", e), Err(e) => tracing::error!("Join error: {}", e), } } Ok(results) } // With concurrency limit use futures::stream::{self, StreamExt}; async fn fetch_with_limit(urls: Vec, limit: usize) -> Vec> { stream::iter(urls) .map(|url| async move { fetch_data(&url).await }) .buffer_unordered(limit) // Max concurrent tasks .collect() .await } // Select first to complete use tokio::select; async fn race_requests(url1: &str, url2: &str) -> Result { select! { result = fetch_data(url1) => result, result = fetch_data(url2) => result, } } ``` ### Pattern 2: Channels for Communication ```rust use tokio::sync::{mpsc, broadcast, oneshot, watch}; // Multi-producer, single-consumer async fn mpsc_example() { let (tx, mut rx) = mpsc::channel::(100); // Spawn producer let tx2 = tx.clone(); tokio::spawn(async move { tx2.send("Hello".to_string()).await.unwrap(); }); // Consume while let Some(msg) = rx.recv().await { println!("Got: {}", msg); } } // Broadcast: multi-producer, multi-consumer async fn broadcast_example() { let (tx, _) = broadcast::channel::(100); let mut rx1 = tx.subscribe(); let mut rx2 = tx.subscribe(); tx.send("Event".to_string()).unwrap(); // Both receivers get the message let _ = rx1.recv().await; let _ = rx2.recv().await; } // Oneshot: single value, single use async fn oneshot_example() -> String { let (tx, rx) = oneshot::channel::(); tokio::spawn(async move { tx.send("Result".to_string()).unwrap(); }); rx.await.unwrap() } // Watch: single producer, multi-consumer, latest value async fn watch_example() { let (tx, mut rx) = watch::channel("initial".to_string()); tokio::spawn(async move { loop { // Wait for changes rx.changed().await.unwrap(); println!("New value: {}", *rx.borrow()); } }); tx.send("updated".to_string()).unwrap(); } ``` ### Pattern 3: Async Error Handling ```rust use anyhow::{Context, Result, bail}; use thiserror::Error; #[derive(Error, Debug)] pub enum ServiceError { #[error("Network error: {0}")] Network(#[from] reqwest::Error), #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Not found: {0}")] NotFound(String), #[error("Timeout after {0:?}")] Timeout(std::time::Duration), } // Using anyhow for application errors async fn process_request(id: &str) -> Result { let data = fetch_data(id) .await .context("Failed to fetch data")?; let parsed = parse_response(&data) .context("Failed to parse response")?; Ok(parsed) } // Using custom errors for library code async fn get_user(id: &str) -> Result { let result = db.query(id).await?; match result { Some(user) => Ok(user), None => Err(ServiceError::NotFound(id.to_string())), } } // Timeout wrapper use tokio::time::timeout; async fn with_timeout(duration: Duration, future: F) -> Result where F: std::future::Future>, { timeout(duration, future) .await .map_err(|_| ServiceError::Timeout(duration))? } ``` ### Pattern 4: Graceful Shutdown ```rust use tokio::signal; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; async fn run_server() -> Result<()> { // Method 1: CancellationToken let token = CancellationToken::new(); let token_clone = token.clone(); // Spawn task that respects cancellation tokio::spawn(async move { loop { tokio::select! { _ = token_clone.cancelled() => { tracing::info!("Task shutting down"); break; } _ = do_work() => {} } } }); // Wait for shutdown signal signal::ctrl_c().await?; tracing::info!("Shutdown signal received"); // Cancel all tasks token.cancel(); // Give tasks time to cleanup tokio::time::sleep(Duration::from_secs(5)).await; Ok(()) } // Method 2: Broadcast channel for shutdown async fn run_with_broadcast() -> Result<()> { let (shutdown_tx, _) = broadcast::channel::<()>(1); let mut rx = shutdown_tx.subscribe(); tokio::spawn(async move { tokio::select! { _ = rx.recv() => { tracing::info!("Received shutdown"); } _ = async { loop { do_work().await } } => {} } }); signal::ctrl_c().await?; let _ = shutdown_tx.send(()); Ok(()) } ``` ### Pattern 5: Async Traits ```rust use async_trait::async_trait; #[async_trait] pub trait Repository { async fn get(&self, id: &str) -> Result; async fn save(&self, entity: &Entity) -> Result<()>; async fn delete(&self, id: &str) -> Result<()>; } pub struct PostgresRepository { pool: sqlx::PgPool, } #[async_trait] impl Repository for PostgresRepository { async fn get(&self, id: &str) -> Result { sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id) .fetch_one(&self.pool) .await .map_err(Into::into) } async fn save(&self, entity: &Entity) -> Result<()> { sqlx::query!( "INSERT INTO entities (id, data) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET data = $2", entity.id, entity.data ) .execute(&self.pool) .await?; Ok(()) } async fn delete(&self, id: &str) -> Result<()> { sqlx::query!("DELETE FROM entities WHERE id = $1", id) .execute(&self.pool) .await?; Ok(()) } } // Trait object usage async fn process(repo: &dyn Repository, id: &str) -> Result<()> { let entity = repo.get(id).await?; // Process... repo.save(&entity).await } ``` ### Pattern 6: Streams and Async Iteration ```rust use futures::stream::{self, Stream, StreamExt}; use async_stream::stream; // Create stream from async iterator fn numbers_stream() -> impl Stream { stream! { for i in 0..10 { tokio::time::sleep(Duration::from_millis(100)).await; yield i; } } } // Process stream async fn process_stream() { let stream = numbers_stream(); // Map and filter let processed: Vec<_> = stream .filter(|n| futures::future::ready(*n % 2 == 0)) .map(|n| n * 2) .collect() .await; println!("{:?}", processed); } // Chunked processing async fn process_in_chunks() { let stream = numbers_stream(); let mut chunks = stream.chunks(3); while let Some(chunk) = chunks.next().await { println!("Processing chunk: {:?}", chunk); } } // Merge multiple streams async fn merge_streams() { let stream1 = numbers_stream(); let stream2 = numbers_stream(); let merged = stream::select(stream1, stream2); merged .for_each(|n| async move { println!("Got: {}", n); }) .await; } ``` ### Pattern 7: Resource Management ```rust use std::sync::Arc; use tokio::sync::{Mutex, RwLock, Semaphore}; // Shared state with RwLock (prefer for read-heavy) struct Cache { data: RwLock>, } impl Cache { async fn get(&self, key: &str) -> Option { self.data.read().await.get(key).cloned() } async fn set(&self, key: String, value: String) { self.data.write().await.insert(key, value); } } // Connection pool with semaphore struct Pool { semaphore: Semaphore, connections: Mutex>, } impl Pool { fn new(size: usize) -> Self { Self { semaphore: Semaphore::new(size), connections: Mutex::new((0..size).map(|_| Connection::new()).collect()), } } async fn acquire(&self) -> PooledConnection<'_> { let permit = self.semaphore.acquire().await.unwrap(); let conn = self.connections.lock().await.pop().unwrap(); PooledConnection { pool: self, conn: Some(conn), _permit: permit } } } struct PooledConnection<'a> { pool: &'a Pool, conn: Option, _permit: tokio::sync::SemaphorePermit<'a>, } impl Drop for PooledConnection<'_> { fn drop(&mut self) { if let Some(conn) = self.conn.take() { let pool = self.pool; tokio::spawn(async move { pool.connections.lock().await.push(conn); }); } } } ``` ## Debugging Tips ```rust // Enable tokio-console for runtime debugging // Cargo.toml: tokio = { features = ["tracing"] } // Run: RUSTFLAGS="--cfg tokio_unstable" cargo run // Then: tokio-console // Instrument async functions use tracing::instrument; #[instrument(skip(pool))] async fn fetch_user(pool: &PgPool, id: &str) -> Result { tracing::debug!("Fetching user"); // ... } // Track task spawning let span = tracing::info_span!("worker", id = %worker_id); tokio::spawn(async move { // Enters span when polled }.instrument(span)); ``` ## Best Practices ### Do's - **Use `tokio::select!`** - For racing futures - **Prefer channels** - Over shared state when possible - **Use `JoinSet`** - For managing multiple tasks - **Instrument with tracing** - For debugging async code - **Handle cancellation** - Check `CancellationToken` ### Don'ts - **Don't block** - Never use `std::thread::sleep` in async - **Don't hold locks across awaits** - Causes deadlocks - **Don't spawn unboundedly** - Use semaphores for limits - **Don't ignore errors** - Propagate with `?` or log - **Don't forget Send bounds** - For spawned futures ## Resources - [Tokio Tutorial](https://tokio.rs/tokio/tutorial) - [Async Book](https://rust-lang.github.io/async-book/) - [Tokio Console](https://github.com/tokio-rs/console)