--- name: python-programming description: Master Python fundamentals, OOP, data structures, async programming, and production-grade scripting for data engineering sasmp_version: "1.3.0" bonded_agent: 01-data-engineer bond_type: PRIMARY_BOND skill_version: "2.0.0" last_updated: "2025-01" complexity: foundational estimated_mastery_hours: 120 prerequisites: [] unlocks: [sql-databases, etl-tools, big-data, machine-learning] --- # Python Programming for Data Engineering Production-grade Python development for building scalable data pipelines, ETL systems, and data-intensive applications. ## Quick Start ```python # Modern Python 3.12+ data engineering setup from dataclasses import dataclass from typing import Generator from collections.abc import Iterator import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class DataRecord: """Type-safe data container with validation.""" id: int value: float category: str def __post_init__(self): if self.value < 0: raise ValueError(f"Value must be non-negative, got {self.value}") def process_records(records: Iterator[dict]) -> Generator[DataRecord, None, None]: """Memory-efficient generator for processing large datasets.""" for idx, record in enumerate(records): try: yield DataRecord( id=record['id'], value=float(record['value']), category=record.get('category', 'unknown') ) except (KeyError, ValueError) as e: logger.warning(f"Skipping invalid record {idx}: {e}") continue # Usage if __name__ == "__main__": sample_data = [{"id": 1, "value": "100.5", "category": "A"}] for record in process_records(iter(sample_data)): logger.info(f"Processed: {record}") ``` ## Core Concepts ### 1. Type-Safe Data Structures (2024-2025 Standard) ```python from typing import TypedDict, NotRequired, Literal from dataclasses import dataclass, field from datetime import datetime # TypedDict for JSON-like structures class PipelineConfig(TypedDict): source: str destination: str batch_size: int retry_count: NotRequired[int] mode: Literal["batch", "streaming"] # Dataclass for domain objects @dataclass(frozen=True, slots=True) class ETLJob: """Immutable, memory-efficient job definition.""" job_id: str created_at: datetime = field(default_factory=datetime.utcnow) config: dict = field(default_factory=dict) def to_dict(self) -> dict: return {"job_id": self.job_id, "created_at": self.created_at.isoformat()} ``` ### 2. Generator Patterns for Large Data ```python from typing import Generator, Iterable import csv from pathlib import Path def read_csv_chunks( file_path: Path, chunk_size: int = 10000 ) -> Generator[list[dict], None, None]: """ Memory-efficient CSV reader using generators. Processes files of any size without loading into memory. """ with open(file_path, 'r', newline='', encoding='utf-8') as f: reader = csv.DictReader(f) chunk = [] for row in reader: chunk.append(row) if len(chunk) >= chunk_size: yield chunk chunk = [] if chunk: # Don't forget the last chunk yield chunk def transform_pipeline( records: Iterable[dict], transformers: list[callable] ) -> Generator[dict, None, None]: """Composable transformation pipeline.""" for record in records: result = record for transform in transformers: result = transform(result) if result is None: break if result is not None: yield result ``` ### 3. Async Programming for I/O-Bound Tasks ```python import asyncio import aiohttp from typing import AsyncGenerator import logging logger = logging.getLogger(__name__) async def fetch_with_retry( session: aiohttp.ClientSession, url: str, max_retries: int = 3, backoff_factor: float = 2.0 ) -> dict | None: """ Fetch URL with exponential backoff retry logic. Production pattern for API data ingestion. """ for attempt in range(max_retries): try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp: resp.raise_for_status() return await resp.json() except aiohttp.ClientError as e: wait_time = backoff_factor ** attempt logger.warning(f"Attempt {attempt+1} failed for {url}: {e}. Retrying in {wait_time}s") await asyncio.sleep(wait_time) logger.error(f"All retries exhausted for {url}") return None async def fetch_all_pages( base_url: str, page_count: int, concurrency_limit: int = 10 ) -> AsyncGenerator[dict, None]: """Concurrent API fetching with rate limiting.""" semaphore = asyncio.Semaphore(concurrency_limit) async def bounded_fetch(session: aiohttp.ClientSession, url: str): async with semaphore: return await fetch_with_retry(session, url) async with aiohttp.ClientSession() as session: tasks = [bounded_fetch(session, f"{base_url}?page={i}") for i in range(page_count)] for result in asyncio.as_completed(tasks): data = await result if data: yield data ``` ### 4. Error Handling & Observability ```python import functools import time import logging from typing import TypeVar, Callable, ParamSpec P = ParamSpec('P') R = TypeVar('R') def with_retry( max_attempts: int = 3, exceptions: tuple = (Exception,), backoff_factor: float = 2.0 ) -> Callable[[Callable[P, R]], Callable[P, R]]: """ Decorator for automatic retry with exponential backoff. Use for flaky operations (network, database connections). """ def decorator(func: Callable[P, R]) -> Callable[P, R]: @functools.wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: last_exception = None for attempt in range(max_attempts): try: return func(*args, **kwargs) except exceptions as e: last_exception = e wait_time = backoff_factor ** attempt logging.warning( f"{func.__name__} attempt {attempt+1} failed: {e}. " f"Retrying in {wait_time}s" ) time.sleep(wait_time) raise last_exception return wrapper return decorator def log_execution_time(func: Callable[P, R]) -> Callable[P, R]: """Decorator for performance monitoring.""" @functools.wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: start = time.perf_counter() try: result = func(*args, **kwargs) duration = time.perf_counter() - start logging.info(f"{func.__name__} completed in {duration:.3f}s") return result except Exception as e: duration = time.perf_counter() - start logging.error(f"{func.__name__} failed after {duration:.3f}s: {e}") raise return wrapper ``` ## Tools & Technologies | Tool | Purpose | Version (2025) | |------|---------|----------------| | **Python** | Core language | 3.12+ | | **uv** | Package manager (replaces pip) | 0.4+ | | **Ruff** | Linter + formatter (replaces Black, flake8) | 0.5+ | | **mypy** | Static type checking | 1.11+ | | **pytest** | Testing framework | 8.0+ | | **pydantic** | Data validation | 2.5+ | | **polars** | DataFrame operations (faster than pandas) | 0.20+ | | **httpx** | Modern HTTP client | 0.27+ | ## Learning Path ### Phase 1: Foundations (Weeks 1-3) ``` Week 1: Core syntax, data types, control flow Week 2: Functions, modules, file I/O Week 3: OOP (classes, inheritance, composition) ``` ### Phase 2: Intermediate (Weeks 4-6) ``` Week 4: Generators, iterators, decorators Week 5: Type hints, dataclasses, protocols Week 6: Error handling, logging, testing basics ``` ### Phase 3: Advanced (Weeks 7-9) ``` Week 7: Async/await, concurrent programming Week 8: Memory optimization, profiling Week 9: Package structure, dependency management ``` ### Phase 4: Production Mastery (Weeks 10-12) ``` Week 10: CI/CD integration, linting, formatting Week 11: Performance optimization patterns Week 12: Production deployment patterns ``` ## Production Patterns ### Configuration Management ```python from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): """Type-safe configuration with environment variable support.""" database_url: str api_key: str batch_size: int = 1000 debug: bool = False class Config: env_file = ".env" env_file_encoding = "utf-8" @lru_cache def get_settings() -> Settings: """Cached settings singleton.""" return Settings() ``` ### Connection Pooling ```python from contextlib import contextmanager from typing import Generator import psycopg2 from psycopg2 import pool class DatabasePool: """Thread-safe connection pool for PostgreSQL.""" def __init__(self, dsn: str, min_conn: int = 2, max_conn: int = 10): self._pool = pool.ThreadedConnectionPool(min_conn, max_conn, dsn) @contextmanager def get_connection(self) -> Generator: conn = self._pool.getconn() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: self._pool.putconn(conn) def close(self): self._pool.closeall() ``` ## Troubleshooting Guide ### Common Failure Modes | Issue | Symptoms | Root Cause | Fix | |-------|----------|------------|-----| | **Memory Error** | `MemoryError`, process killed | Loading full dataset into memory | Use generators, chunked processing | | **Import Error** | `ModuleNotFoundError` | Virtual env not activated, missing dep | `uv pip install`, check `sys.path` | | **Type Error** | `TypeError: unhashable type` | Using mutable as dict key | Convert to tuple or use dataclass | | **Async Deadlock** | Program hangs | Blocking call in async code | Use `asyncio.to_thread()` for blocking ops | | **GIL Bottleneck** | CPU-bound parallelism slow | Python GIL limits threads | Use `multiprocessing` or `ProcessPoolExecutor` | ### Debug Checklist ```bash # 1. Check Python version python --version # Should be 3.12+ # 2. Verify virtual environment which python # Should point to venv # 3. Check installed packages uv pip list | grep # 4. Run with verbose logging python -m mymodule -v 2>&1 | tee debug.log # 5. Profile memory usage python -m memory_profiler script.py # 6. Profile CPU python -m cProfile -s cumtime script.py ``` ### Log Interpretation ```python # Structured logging for easier debugging import structlog logger = structlog.get_logger() def process_batch(batch_id: str, records: list): logger.info("batch_started", batch_id=batch_id, record_count=len(records)) try: # processing... logger.info("batch_completed", batch_id=batch_id, success=True) except Exception as e: logger.error("batch_failed", batch_id=batch_id, error=str(e), exc_info=True) raise ``` ## Unit Test Template ```python import pytest from unittest.mock import Mock, patch from your_module import process_records, DataRecord class TestProcessRecords: """Unit tests following AAA pattern (Arrange-Act-Assert).""" def test_valid_records_processed(self): # Arrange input_data = [{"id": 1, "value": "10.5", "category": "A"}] # Act result = list(process_records(iter(input_data))) # Assert assert len(result) == 1 assert result[0].id == 1 assert result[0].value == 10.5 def test_invalid_records_skipped(self): # Arrange input_data = [{"id": 1}] # Missing 'value' # Act result = list(process_records(iter(input_data))) # Assert assert len(result) == 0 def test_negative_value_raises_error(self): # Arrange & Act & Assert with pytest.raises(ValueError, match="non-negative"): DataRecord(id=1, value=-5.0, category="A") @patch('your_module.external_api_call') def test_with_mocked_dependency(self, mock_api): # Arrange mock_api.return_value = {"status": "ok"} # Act result = function_using_api() # Assert mock_api.assert_called_once() assert result["status"] == "ok" ``` ## Best Practices ### Code Style (2025 Standards) ```python # ✅ DO: Use type hints everywhere def calculate_metrics(data: list[float]) -> dict[str, float]: ... # ✅ DO: Prefer composition over inheritance @dataclass class Pipeline: reader: DataReader transformer: Transformer writer: DataWriter # ✅ DO: Use context managers for resources with open_connection() as conn: process(conn) # ❌ DON'T: Use bare except try: ... except: pass # Never do this # ❌ DON'T: Mutate function arguments def process(items: list) -> list: items.append("new") # Avoid this return items.copy() # Return new list instead ``` ### Performance Tips ```python # ✅ Use generators for large data def process_large_file(path): with open(path) as f: for line in f: # Memory efficient yield transform(line) # ✅ Use set/dict for O(1) lookups valid_ids = set(load_valid_ids()) # Not list if item_id in valid_ids: ... # ✅ Use local variables in hot loops def hot_loop(items): local_func = expensive_lookup # Cache reference for item in items: local_func(item) ``` ## Resources ### Official Documentation - [Python Docs](https://docs.python.org/3/) - [PEP Index](https://peps.python.org/) - [Typing Module](https://docs.python.org/3/library/typing.html) ### Production References - [Real Python](https://realpython.com/) - [Python Patterns](https://python-patterns.guide/) - [High Performance Python (Book)](https://www.oreilly.com/library/view/high-performance-python/9781492055013/) ### Community - [Python Discord](https://pythondiscord.com/) - [r/Python](https://reddit.com/r/Python) - [PyPI - Package Index](https://pypi.org/) ## Next Skills After mastering Python programming: - → `sql-databases` - Query and manage relational data - → `etl-tools` - Build data pipelines with Airflow - → `big-data` - Scale with Spark and distributed systems - → `machine-learning` - Apply ML with scikit-learn --- **Skill Certification Checklist:** - [ ] Can write type-safe Python with mypy validation - [ ] Can implement generators for large data processing - [ ] Can use async/await for concurrent I/O - [ ] Can write comprehensive unit tests with pytest - [ ] Can profile and optimize Python performance