--- name: python-async-patterns description: Comprehensive guide to Python async/await patterns, best practices, and anti-patterns. Covers asyncio fundamentals, coroutines, async context managers, task management, common libraries (aiohttp, aiofiles, asyncpg), framework integration (FastAPI, Django), performance considerations, and proper exception handling. Use when reviewing or writing asynchronous Python code. allowed-tools: Read, Grep, Glob --- # Python Async/Await Patterns and Best Practices ## Purpose This skill provides comprehensive guidance on Python asynchronous programming patterns for reviewing and writing async code. It covers async/await syntax, best practices, common patterns, anti-patterns, and framework-specific considerations. Use this skill when: - Reviewing async Python code - Writing new async functions or services - Refactoring sync code to async - Troubleshooting async performance issues - Evaluating async library usage - Implementing async endpoints in web frameworks ## Context Asynchronous programming in Python enables efficient I/O-bound operations by allowing the event loop to switch between tasks while waiting for I/O operations to complete. However, async code introduces complexity and has specific patterns and pitfalls that must be understood for correct implementation. **Key Principles:** - Async is beneficial for I/O-bound operations, not CPU-bound tasks - The event loop must never be blocked with synchronous operations - Coroutines must be properly awaited - Resources must be cleaned up correctly in async contexts - Mixing sync and async code requires careful consideration **Python Version Requirements:** - `async`/`await`: Python 3.5+ - `asyncio.run()`: Python 3.7+ - `asyncio.create_task()`: Python 3.7+ - Task Groups (`asyncio.TaskGroup`): Python 3.11+ - Exception Groups: Python 3.11+ ## Async Fundamentals ### 1. Async/Await Syntax **Defining Async Functions:** ```python # Async function (coroutine function) async def fetch_data(url: str) -> dict: """Coroutine that fetches data asynchronously.""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() # Regular function (synchronous) def process_data(data: dict) -> list: """Regular synchronous function.""" return [item['value'] for item in data['items']] ``` **Key Differences:** - `async def` creates a coroutine function - Calling an async function returns a coroutine object (not the result) - Must use `await` to execute the coroutine and get the result - Can only use `await` inside `async def` functions ### 2. Coroutines vs Regular Functions ```python # Coroutine function async def async_operation(): await asyncio.sleep(1) return "done" # This creates a coroutine object but doesn't execute it coro = async_operation() # RuntimeWarning: coroutine was never awaited # Must await to execute result = await async_operation() # Correct # Or run from synchronous code result = asyncio.run(async_operation()) # Correct (Python 3.7+) ``` **Coroutine Characteristics:** - Lazy evaluation: not executed until awaited - Can be suspended and resumed - Return values through `await` - Must be awaited or scheduled for execution ### 3. Event Loop Fundamentals The event loop is the core of async execution: ```python import asyncio # High-level API (Python 3.7+) - PREFERRED async def main(): result = await some_async_operation() return result # Run the event loop if __name__ == "__main__": result = asyncio.run(main()) # Creates, runs, and closes loop ``` **Event Loop Responsibilities:** - Schedules and executes coroutines - Manages I/O operations - Handles callbacks and tasks - Coordinates concurrent execution ### 4. Using asyncio.run() ```python # GOOD: Modern approach (Python 3.7+) async def main(): result = await fetch_data() processed = await process_data(result) return processed if __name__ == "__main__": result = asyncio.run(main()) ``` ```python # AVOID: Manual loop management (legacy, pre-3.7) loop = asyncio.get_event_loop() try: result = loop.run_until_complete(main()) finally: loop.close() ``` **Why prefer asyncio.run():** - Automatically creates and closes the loop - Handles cleanup properly - Simpler and less error-prone - Cancels remaining tasks on shutdown ## Common Async Patterns ### 1. Async Context Managers Async context managers use `async with` for resource management: ```python class AsyncDatabaseConnection: """Example async context manager.""" async def __aenter__(self): """Called when entering async with block.""" self.connection = await self._connect() return self.connection async def __aexit__(self, exc_type, exc_val, exc_tb): """Called when exiting async with block.""" await self.connection.close() return False # Don't suppress exceptions async def _connect(self): """Establish connection.""" await asyncio.sleep(0.1) # Simulated async connection return {"connected": True} # Usage async def query_database(): async with AsyncDatabaseConnection() as conn: # Connection automatically closed after block result = await execute_query(conn) return result ``` **Common Async Context Managers:** - `aiohttp.ClientSession()`: HTTP client sessions - `aiofiles.open()`: Async file operations - Database connection pools (asyncpg, motor) - Lock primitives (`asyncio.Lock()`) ### 2. Async Iterators and Generators ```python class AsyncDataStream: """Async iterator for streaming data.""" def __init__(self, count: int): self.count = count self.index = 0 def __aiter__(self): """Return the async iterator.""" return self async def __anext__(self): """Get next item asynchronously.""" if self.index >= self.count: raise StopAsyncIteration await asyncio.sleep(0.1) # Simulated async operation value = f"item_{self.index}" self.index += 1 return value # Usage async def process_stream(): async for item in AsyncDataStream(5): print(item) # Async generator (simpler approach) async def async_range(count: int): """Async generator function.""" for i in range(count): await asyncio.sleep(0.1) yield i # Usage async def use_generator(): async for value in async_range(5): print(value) ``` ### 3. Async Comprehensions ```python # Async list comprehension async def fetch_all_urls(urls: list[str]) -> list[dict]: async with aiohttp.ClientSession() as session: # Create tasks for all URLs results = [ await fetch_one(session, url) for url in urls ] return results # Async generator expression async def process_stream_data(stream): processed = ( transform(item) async for item in stream if await validate(item) ) return processed # Async dict comprehension async def fetch_user_data(user_ids: list[int]) -> dict[int, dict]: return { user_id: await fetch_user(user_id) for user_id in user_ids } ``` ### 4. Concurrent Task Execution #### Using asyncio.gather() ```python # Run multiple coroutines concurrently async def fetch_multiple_resources(): # All tasks run concurrently results = await asyncio.gather( fetch_user(1), fetch_posts(1), fetch_comments(1) ) user, posts, comments = results return user, posts, comments # With error handling async def fetch_with_error_handling(): results = await asyncio.gather( fetch_user(1), fetch_posts(1), fetch_comments(1), return_exceptions=True # Return exceptions instead of raising ) # Check for exceptions for i, result in enumerate(results): if isinstance(result, Exception): print(f"Task {i} failed: {result}") ``` #### Using asyncio.create_task() ```python # Create tasks explicitly for more control async def fetch_with_tasks(): # Create tasks (they start executing immediately) user_task = asyncio.create_task(fetch_user(1)) posts_task = asyncio.create_task(fetch_posts(1)) comments_task = asyncio.create_task(fetch_comments(1)) # Wait for all tasks user = await user_task posts = await posts_task comments = await comments_task return user, posts, comments # Cancel tasks if needed async def fetch_with_cancellation(): task = asyncio.create_task(long_running_operation()) try: result = await asyncio.wait_for(task, timeout=5.0) return result except asyncio.TimeoutError: task.cancel() # Cancel the task raise ``` #### Using Task Groups (Python 3.11+) ```python # Task groups provide better error handling and cancellation async def fetch_with_task_group(): async with asyncio.TaskGroup() as tg: user_task = tg.create_task(fetch_user(1)) posts_task = tg.create_task(fetch_posts(1)) comments_task = tg.create_task(fetch_comments(1)) # All tasks complete here (or exception raised) return user_task.result(), posts_task.result(), comments_task.result() # If any task fails, all tasks are cancelled async def task_group_error_handling(): try: async with asyncio.TaskGroup() as tg: tg.create_task(task_that_fails()) tg.create_task(task_that_succeeds()) except ExceptionGroup as eg: # All exceptions collected in ExceptionGroup for exc in eg.exceptions: print(f"Task failed: {exc}") ``` ### 5. Task Coordination Patterns #### Wait for first completion ```python # Wait for first task to complete async def fetch_from_multiple_sources(urls: list[str]): async with aiohttp.ClientSession() as session: tasks = [ asyncio.create_task(fetch_one(session, url)) for url in urls ] # Wait for first completion done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) # Cancel remaining tasks for task in pending: task.cancel() # Return first result return done.pop().result() ``` #### Semaphore for rate limiting ```python # Limit concurrent operations async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 5): semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_sem(url: str): async with semaphore: return await fetch_one(url) results = await asyncio.gather(*[ fetch_with_sem(url) for url in urls ]) return results ``` ## Best Practices ### 1. Avoid Blocking the Event Loop ```python # BAD: Blocking operations in async function async def bad_async_function(): # These block the event loop! time.sleep(1) # WRONG requests.get(url) # WRONG open('file.txt').read() # WRONG some_cpu_intensive_task() # WRONG # GOOD: Use async alternatives async def good_async_function(): await asyncio.sleep(1) # Non-blocking sleep async with aiohttp.ClientSession() as session: await session.get(url) # Async HTTP async with aiofiles.open('file.txt') as f: await f.read() # Async file I/O # For CPU-bound: use asyncio.to_thread() result = await asyncio.to_thread(cpu_intensive_task) ``` ### 2. Using asyncio.to_thread for Blocking I/O ```python import asyncio from functools import partial # For blocking sync operations (Python 3.9+) async def async_wrapper_for_blocking(): # Run blocking operation in thread pool result = await asyncio.to_thread( blocking_operation, arg1, arg2 ) return result # For CPU-bound tasks async def process_data_async(data: list): # Offload CPU-intensive work to thread processed = await asyncio.to_thread( cpu_intensive_processing, data ) return processed # With partial for complex arguments async def complex_blocking_call(): func = partial( blocking_function, timeout=30, retry=3 ) result = await asyncio.to_thread(func) return result ``` ### 3. Proper Exception Handling ```python # Handle exceptions in individual coroutines async def fetch_with_exception_handling(url: str): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: # Handle specific async HTTP errors print(f"HTTP error: {e}") raise except asyncio.TimeoutError: # Handle timeout print(f"Timeout fetching {url}") raise except Exception as e: # Handle unexpected errors print(f"Unexpected error: {e}") raise # Handle exceptions in gather() async def fetch_multiple_with_errors(urls: list[str]): results = await asyncio.gather( *[fetch_with_exception_handling(url) for url in urls], return_exceptions=True ) # Process results and exceptions successful = [] failed = [] for url, result in zip(urls, results): if isinstance(result, Exception): failed.append((url, result)) else: successful.append(result) return successful, failed # Exception groups (Python 3.11+) async def handle_exception_groups(): try: async with asyncio.TaskGroup() as tg: tg.create_task(task1()) tg.create_task(task2()) except* ValueError as eg: # Handle all ValueErrors print(f"Value errors: {eg.exceptions}") except* TypeError as eg: # Handle all TypeErrors print(f"Type errors: {eg.exceptions}") ``` ### 4. Cancellation Handling ```python # Proper cancellation handling async def cancellable_operation(): try: await long_running_task() except asyncio.CancelledError: # Clean up resources await cleanup() # Re-raise to propagate cancellation raise # Shielding from cancellation async def critical_operation(): # This won't be cancelled await asyncio.shield(important_task()) # Timeout with proper cleanup async def operation_with_timeout(): try: result = await asyncio.wait_for( some_operation(), timeout=5.0 ) return result except asyncio.TimeoutError: # Cleanup happens here await cleanup() raise ``` ### 5. Timeout Patterns ```python # Single operation timeout async def fetch_with_timeout(url: str, timeout: float = 10.0): try: async with aiohttp.ClientSession() as session: async with asyncio.timeout(timeout): # Python 3.11+ async with session.get(url) as response: return await response.json() except asyncio.TimeoutError: print(f"Timeout after {timeout}s") raise # Legacy timeout (Python < 3.11) async def fetch_with_timeout_legacy(url: str, timeout: float = 10.0): try: result = await asyncio.wait_for( fetch_one(url), timeout=timeout ) return result except asyncio.TimeoutError: print(f"Timeout after {timeout}s") raise # Multiple timeouts async def complex_operation_with_timeouts(): # Per-operation timeouts async with asyncio.timeout(30): # Overall timeout result1 = await asyncio.wait_for(op1(), timeout=10) result2 = await asyncio.wait_for(op2(), timeout=10) result3 = await asyncio.wait_for(op3(), timeout=10) return result1, result2, result3 ``` ### 6. Resource Cleanup ```python # Always use async context managers async def proper_resource_cleanup(): # GOOD: Automatic cleanup async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Session and response closed automatically # For resources without context managers async def manual_cleanup(): resource = await acquire_resource() try: result = await use_resource(resource) return result finally: # Always cleanup await resource.close() # Async cleanup in exception handlers async def cleanup_on_error(): resources = [] try: for i in range(10): resource = await acquire_resource(i) resources.append(resource) await use_resource(resource) except Exception as e: # Cleanup all acquired resources await asyncio.gather( *[res.close() for res in resources], return_exceptions=True ) raise finally: # Final cleanup await asyncio.gather( *[res.close() for res in resources], return_exceptions=True ) ``` ## Common Libraries ### 1. aiohttp - Async HTTP Client/Server ```python import aiohttp import asyncio # Client usage async def fetch_data(url: str) -> dict: """Fetch data using aiohttp.""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json() # With custom headers and timeout async def fetch_with_options(url: str): headers = {"Authorization": "Bearer token"} timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: async with session.get(url) as response: return await response.text() # POST request async def post_data(url: str, data: dict): async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: return await response.json() # Concurrent requests with rate limiting async def fetch_multiple(urls: list[str], max_concurrent: int = 5): semaphore = asyncio.Semaphore(max_concurrent) async def fetch_one(session, url): async with semaphore: async with session.get(url) as response: return await response.json() async with aiohttp.ClientSession() as session: tasks = [fetch_one(session, url) for url in urls] return await asyncio.gather(*tasks) ``` ### 2. aiofiles - Async File I/O ```python import aiofiles # Read file asynchronously async def read_file(filepath: str) -> str: """Read file contents asynchronously.""" async with aiofiles.open(filepath, mode='r') as f: contents = await f.read() return contents # Write file asynchronously async def write_file(filepath: str, content: str): """Write content to file asynchronously.""" async with aiofiles.open(filepath, mode='w') as f: await f.write(content) # Read lines async def read_lines(filepath: str) -> list[str]: """Read file line by line.""" lines = [] async with aiofiles.open(filepath, mode='r') as f: async for line in f: lines.append(line.strip()) return lines # Process large file async def process_large_file(filepath: str): """Process file line by line without loading all into memory.""" async with aiofiles.open(filepath, mode='r') as f: async for line in f: processed = await process_line(line) await save_result(processed) ``` ### 3. asyncpg - Async PostgreSQL Driver ```python import asyncpg # Create connection pool async def create_pool(): """Create database connection pool.""" pool = await asyncpg.create_pool( host='localhost', port=5432, user='user', password='password', database='mydb', min_size=10, max_size=20 ) return pool # Query with pool async def fetch_users(pool: asyncpg.Pool): """Fetch users from database.""" async with pool.acquire() as connection: rows = await connection.fetch( 'SELECT id, name, email FROM users WHERE active = $1', True ) return [dict(row) for row in rows] # Transaction async def create_user_with_profile(pool: asyncpg.Pool, user_data: dict): """Create user and profile in transaction.""" async with pool.acquire() as connection: async with connection.transaction(): user_id = await connection.fetchval( 'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id', user_data['name'], user_data['email'] ) await connection.execute( 'INSERT INTO profiles (user_id, bio) VALUES ($1, $2)', user_id, user_data['bio'] ) return user_id # Prepared statements for better performance async def fetch_user_by_id(pool: asyncpg.Pool, user_id: int): """Fetch user using prepared statement.""" async with pool.acquire() as connection: stmt = await connection.prepare('SELECT * FROM users WHERE id = $1') row = await stmt.fetchrow(user_id) return dict(row) if row else None ``` ### 4. motor - Async MongoDB Driver ```python from motor.motor_asyncio import AsyncIOMotorClient # Create client async def create_mongo_client(): """Create MongoDB client.""" client = AsyncIOMotorClient('mongodb://localhost:27017') return client # Query documents async def fetch_documents(collection_name: str): """Fetch documents from MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name] cursor = collection.find({'active': True}) documents = await cursor.to_list(length=100) return documents # Insert document async def insert_document(collection_name: str, document: dict): """Insert document into MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name] result = await collection.insert_one(document) return result.inserted_id # Update document async def update_document(collection_name: str, filter_dict: dict, update_dict: dict): """Update document in MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name] result = await collection.update_one( filter_dict, {'$set': update_dict} ) return result.modified_count # Async iteration over cursor async def process_large_collection(collection_name: str): """Process large collection with cursor.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name] cursor = collection.find({}) async for document in cursor: await process_document(document) ``` ## Anti-Patterns to Avoid ### 1. Not Awaiting Coroutines ```python # BAD: Coroutine never executed async def bad_example(): result = fetch_data() # RuntimeWarning: coroutine 'fetch_data' was never awaited return result # GOOD: Properly awaited async def good_example(): result = await fetch_data() return result ``` ### 2. Mixing Sync and Async Incorrectly ```python # BAD: Calling async function from sync code without event loop def sync_function(): result = await async_function() # SyntaxError: 'await' outside async function return result # GOOD: Use asyncio.run() to bridge sync/async def sync_function(): result = asyncio.run(async_function()) return result # BAD: Creating new event loop in async context async def bad_nested(): result = asyncio.run(another_async()) # RuntimeError: asyncio.run() cannot be called from a running event loop return result # GOOD: Just await in async context async def good_nested(): result = await another_async() return result ``` ### 3. Creating Event Loops Manually ```python # BAD: Manual loop management (unless necessary) def bad_approach(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(main()) finally: loop.close() return result # GOOD: Use asyncio.run() def good_approach(): result = asyncio.run(main()) return result # EXCEPTION: Multiple event loops needed (advanced use case) # Only create manual loops when you specifically need multiple loops ``` ### 4. Blocking Operations in Async Functions ```python # BAD: Blocking the event loop async def bad_blocking(): time.sleep(5) # Blocks entire event loop! data = requests.get(url).json() # Blocks event loop! with open('file.txt') as f: content = f.read() # Blocks event loop! result = compute_fibonacci(1000000) # Blocks event loop! return result # GOOD: Use async alternatives async def good_async(): await asyncio.sleep(5) # Non-blocking async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Non-blocking async with aiofiles.open('file.txt') as f: content = await f.read() # Non-blocking result = await asyncio.to_thread(compute_fibonacci, 1000000) # Non-blocking return result ``` ### 5. Improper Exception Handling ```python # BAD: Bare except swallows CancelledError async def bad_exception_handling(): try: await some_operation() except: # Catches CancelledError, preventing proper cancellation! print("Error occurred") # GOOD: Specific exception handling async def good_exception_handling(): try: await some_operation() except asyncio.CancelledError: # Handle cancellation specifically await cleanup() raise # Re-raise to propagate except Exception as e: # Handle other exceptions print(f"Error: {e}") raise # BAD: Silencing errors in gather() async def bad_gather(): results = await asyncio.gather( task1(), task2(), task3(), return_exceptions=True ) return results # Might contain exceptions that are ignored # GOOD: Check for exceptions async def good_gather(): results = await asyncio.gather( task1(), task2(), task3(), return_exceptions=True ) for i, result in enumerate(results): if isinstance(result, Exception): print(f"Task {i} failed: {result}") # Handle or raise as appropriate return [r for r in results if not isinstance(r, Exception)] ``` ### 6. Resource Leaks ```python # BAD: Not closing resources async def bad_resource_management(): session = aiohttp.ClientSession() response = await session.get(url) data = await response.json() # Session and response never closed! return data # GOOD: Use context managers async def good_resource_management(): async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Automatically closed return data # BAD: Creating session per request async def bad_session_usage(): for url in urls: async with aiohttp.ClientSession() as session: # Creating new session each iteration is wasteful! await session.get(url) # GOOD: Reuse session async def good_session_usage(): async with aiohttp.ClientSession() as session: for url in urls: # Reuse same session await session.get(url) ``` ### 7. Not Using Task Groups for Error Propagation (Python 3.11+) ```python # BAD: Errors might be swallowed async def bad_error_propagation(): tasks = [ asyncio.create_task(task1()), asyncio.create_task(task2()), asyncio.create_task(task3()) ] # If task2 fails, task1 and task3 keep running await asyncio.gather(*tasks) # GOOD: Use TaskGroup for automatic cancellation async def good_error_propagation(): async with asyncio.TaskGroup() as tg: tg.create_task(task1()) tg.create_task(task2()) tg.create_task(task3()) # If any task fails, all tasks are cancelled ``` ## Framework-Specific Patterns ### 1. FastAPI Async Endpoints ```python from fastapi import FastAPI, HTTPException, Depends from typing import List import asyncpg app = FastAPI() # Async endpoint @app.get("/users/{user_id}") async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)): """Async endpoint that queries database.""" async with db.acquire() as connection: user = await connection.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not user: raise HTTPException(status_code=404, detail="User not found") return dict(user) # Async endpoint with external API call @app.get("/external-data") async def fetch_external_data(): """Fetch data from external API.""" async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: return await response.json() # Background tasks (async) from fastapi import BackgroundTasks @app.post("/send-notification") async def send_notification( email: str, background_tasks: BackgroundTasks ): """Send notification in background.""" background_tasks.add_task(send_email_async, email) return {"message": "Notification queued"} async def send_email_async(email: str): """Send email asynchronously.""" async with aiohttp.ClientSession() as session: await session.post( 'https://email-service.com/send', json={'to': email} ) # Dependency injection with async async def get_db_pool() -> asyncpg.Pool: """Dependency that provides database pool.""" pool = await asyncpg.create_pool(...) try: yield pool finally: await pool.close() # Lifespan events from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown events.""" # Startup app.state.db_pool = await asyncpg.create_pool(...) app.state.http_session = aiohttp.ClientSession() yield # Shutdown await app.state.db_pool.close() await app.state.http_session.close() app = FastAPI(lifespan=lifespan) ``` ### 2. Django Async Views (Django 3.1+) ```python from django.http import JsonResponse from django.views import View from asgiref.sync import sync_to_async import aiohttp # Async function-based view async def async_user_list(request): """Async view for listing users.""" # Use sync_to_async for Django ORM users = await sync_to_async(list)( User.objects.filter(active=True) ) return JsonResponse({ 'users': [ {'id': u.id, 'name': u.name} for u in users ] }) # Async class-based view class AsyncUserDetailView(View): """Async class-based view.""" async def get(self, request, user_id): """Get user details asynchronously.""" # Wrap ORM calls with sync_to_async user = await sync_to_async(User.objects.get)(id=user_id) return JsonResponse({ 'id': user.id, 'name': user.name, 'email': user.email }) # Async view with external API async def fetch_external_data_view(request): """Fetch data from external API.""" async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: data = await response.json() return JsonResponse(data) # Django ORM with async (Django 4.1+) async def async_orm_view(request): """Use async ORM operations.""" # Django 4.1+ supports async ORM users = [user async for user in User.objects.filter(active=True)] count = await User.objects.filter(active=True).acount() return JsonResponse({ 'count': count, 'users': [{'id': u.id, 'name': u.name} for u in users] }) # Middleware (async) class AsyncMiddleware: """Async middleware example.""" async def __call__(self, request): # Pre-processing await log_request_async(request) # Call next middleware/view response = await self.get_response(request) # Post-processing await log_response_async(response) return response ``` ### 3. Async Database Operations ```python # asyncpg with connection pool class AsyncPostgresService: """Service for async PostgreSQL operations.""" def __init__(self, pool: asyncpg.Pool): self.pool = pool async def get_user(self, user_id: int) -> dict | None: """Get user by ID.""" async with self.pool.acquire() as conn: row = await conn.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) return dict(row) if row else None async def create_user(self, name: str, email: str) -> int: """Create new user.""" async with self.pool.acquire() as conn: user_id = await conn.fetchval( 'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id', name, email ) return user_id async def update_user(self, user_id: int, **kwargs) -> bool: """Update user fields.""" if not kwargs: return False set_clause = ', '.join(f"{k} = ${i+2}" for i, k in enumerate(kwargs.keys())) query = f'UPDATE users SET {set_clause} WHERE id = $1' async with self.pool.acquire() as conn: result = await conn.execute(query, user_id, *kwargs.values()) return result != 'UPDATE 0' async def batch_insert_users(self, users: list[tuple[str, str]]) -> int: """Batch insert users.""" async with self.pool.acquire() as conn: result = await conn.executemany( 'INSERT INTO users (name, email) VALUES ($1, $2)', users ) return len(users) # Motor (MongoDB) service class AsyncMongoService: """Service for async MongoDB operations.""" def __init__(self, client: AsyncIOMotorClient): self.db = client.mydb async def get_document(self, collection: str, doc_id: str) -> dict | None: """Get document by ID.""" coll = self.db[collection] return await coll.find_one({'_id': doc_id}) async def insert_document(self, collection: str, document: dict) -> str: """Insert document.""" coll = self.db[collection] result = await coll.insert_one(document) return str(result.inserted_id) async def find_documents( self, collection: str, filter_dict: dict, limit: int = 100 ) -> list[dict]: """Find documents matching filter.""" coll = self.db[collection] cursor = coll.find(filter_dict).limit(limit) return await cursor.to_list(length=limit) async def aggregate(self, collection: str, pipeline: list[dict]) -> list[dict]: """Run aggregation pipeline.""" coll = self.db[collection] cursor = coll.aggregate(pipeline) return await cursor.to_list(length=None) ``` ## Performance Considerations ### 1. When Async Provides Benefits (I/O-Bound) Async is beneficial for I/O-bound operations: ```python # GOOD: I/O-bound operations benefit from async async def io_bound_operations(): """These operations benefit from async.""" # Network requests async with aiohttp.ClientSession() as session: responses = await asyncio.gather( session.get('https://api1.example.com'), session.get('https://api2.example.com'), session.get('https://api3.example.com') ) # Database queries async with pool.acquire() as conn: users, posts, comments = await asyncio.gather( conn.fetch('SELECT * FROM users'), conn.fetch('SELECT * FROM posts'), conn.fetch('SELECT * FROM comments') ) # File I/O async with aiofiles.open('large_file.txt') as f: content = await f.read() return responses, users, posts, comments # Example: 100 HTTP requests async def fetch_many_urls(urls: list[str]): """Async is much faster for concurrent I/O.""" async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] responses = await asyncio.gather(*tasks) # With async: ~2 seconds # With sync requests: ~200 seconds (2s per request × 100) ``` ### 2. When Async Doesn't Help (CPU-Bound) Async provides no benefit for CPU-bound operations: ```python # BAD: CPU-bound operations don't benefit from async async def cpu_bound_async(): """This gains nothing from being async.""" result = compute_fibonacci(1000000) # Blocks event loop! return result # GOOD: Use asyncio.to_thread for CPU-bound work async def cpu_bound_with_threads(): """Offload CPU-bound work to threads.""" result = await asyncio.to_thread(compute_fibonacci, 1000000) return result # BETTER: Use ProcessPoolExecutor for true parallelism from concurrent.futures import ProcessPoolExecutor async def cpu_bound_with_processes(): """Use processes for CPU-bound parallelism.""" loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, compute_fibonacci, 1000000 ) return result # Example: Multiple CPU-bound tasks async def parallel_cpu_tasks(numbers: list[int]): """Run CPU-bound tasks in parallel using processes.""" loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: results = await asyncio.gather(*[ loop.run_in_executor(pool, compute_fibonacci, n) for n in numbers ]) return results ``` ### 3. Measuring Async Performance ```python import time import asyncio from functools import wraps # Decorator to measure async function performance def async_timer(func): """Measure execution time of async function.""" @wraps(func) async def wrapper(*args, **kwargs): start = time.perf_counter() result = await func(*args, **kwargs) end = time.perf_counter() print(f"{func.__name__} took {end - start:.2f} seconds") return result return wrapper # Usage @async_timer async def fetch_data(url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() # Context manager for timing blocks from contextlib import asynccontextmanager @asynccontextmanager async def timer(name: str): """Time a block of async code.""" start = time.perf_counter() yield end = time.perf_counter() print(f"{name} took {end - start:.2f} seconds") # Usage async def timed_operations(): async with timer("Database queries"): users = await fetch_users() posts = await fetch_posts() async with timer("External API calls"): data = await fetch_external_data() # Compare sync vs async performance async def compare_performance(): """Compare sync and async performance.""" urls = ['https://example.com'] * 100 # Sync version (for comparison) start = time.perf_counter() sync_results = [requests.get(url).json() for url in urls] sync_time = time.perf_counter() - start print(f"Sync: {sync_time:.2f}s") # Async version start = time.perf_counter() async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] async_results = await asyncio.gather(*tasks) async_time = time.perf_counter() - start print(f"Async: {async_time:.2f}s") print(f"Speedup: {sync_time / async_time:.1f}x") ``` ## Code Review Checklist Use this checklist when reviewing async Python code: ### Syntax and Structure - [ ] All async functions defined with `async def` - [ ] All coroutines properly awaited - [ ] `asyncio.run()` used for top-level entry point (Python 3.7+) - [ ] No manual event loop creation (unless necessary) - [ ] Async context managers used with `async with` - [ ] Async iterators used with `async for` ### Concurrency Patterns - [ ] `asyncio.gather()` used for concurrent operations - [ ] `asyncio.create_task()` used for fire-and-forget tasks - [ ] Task groups used for error propagation (Python 3.11+) - [ ] Semaphores used for rate limiting - [ ] Proper task cancellation handling ### Event Loop - [ ] No blocking operations in async functions - [ ] `asyncio.to_thread()` used for blocking I/O (Python 3.9+) - [ ] No `time.sleep()` (use `asyncio.sleep()`) - [ ] No sync HTTP libraries (use `aiohttp`) - [ ] No sync file operations (use `aiofiles`) ### Exception Handling - [ ] Specific exception types caught - [ ] `asyncio.CancelledError` handled and re-raised - [ ] `return_exceptions=True` used appropriately in `gather()` - [ ] Exceptions in tasks not silently ignored - [ ] Exception groups used for multiple task errors (Python 3.11+) ### Resource Management - [ ] Async context managers used for resources - [ ] Resources properly closed in finally blocks - [ ] No session/connection leaks - [ ] Sessions reused across multiple requests - [ ] Connection pools used for databases ### Timeouts and Cancellation - [ ] Timeouts set for external operations - [ ] `asyncio.wait_for()` or `asyncio.timeout()` used - [ ] Tasks can be cancelled gracefully - [ ] Critical sections shielded from cancellation if needed - [ ] Cleanup performed on timeout/cancellation ### Library Usage - [ ] `aiohttp` used instead of `requests` - [ ] `aiofiles` used for async file I/O - [ ] Async database drivers used (`asyncpg`, `motor`) - [ ] Library-specific best practices followed ### Framework Integration - [ ] FastAPI async endpoints defined correctly - [ ] Django ORM calls wrapped with `sync_to_async` (Django < 4.1) - [ ] Async ORM operations used (Django 4.1+) - [ ] Dependency injection works with async ### Performance - [ ] Async used for I/O-bound operations - [ ] CPU-bound work offloaded to threads/processes - [ ] Unnecessary async overhead avoided - [ ] Appropriate concurrency limits set ### Type Hints - [ ] Async functions return type hints - [ ] Coroutine types annotated - [ ] `Awaitable`, `Coroutine` types used where appropriate ## Related Skills - **python-testing**: For testing async code with pytest-asyncio - **python-type-hints**: For properly typing async functions and coroutines - **python-error-handling**: For exception handling patterns - **fastapi-best-practices**: For FastAPI-specific async patterns - **django-best-practices**: For Django async views and ORM ## References - [PEP 492 - Coroutines with async and await syntax](https://peps.python.org/pep-0492/) - [PEP 3156 - Asynchronous IO Support Rebooted: the "asyncio" Module](https://peps.python.org/pep-3156/) - [Python asyncio Documentation](https://docs.python.org/3/library/asyncio.html) - [aiohttp Documentation](https://docs.aiohttp.org/) - [Real Python - Async IO in Python](https://realpython.com/async-io-python/) - [FastAPI Async Documentation](https://fastapi.tiangolo.com/async/) - [Django Async Documentation](https://docs.djangoproject.com/en/stable/topics/async/) --- **Version:** 1.0 **Last Updated:** 2025-12-24 **Maintainer:** Claude Code Skills