--- name: airflow-dag-patterns description: Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs. --- # Apache Airflow DAG Patterns Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies. ## When to Use This Skill - Creating data pipeline orchestration with Airflow - Designing DAG structures and dependencies - Implementing custom operators and sensors - Testing Airflow DAGs locally - Setting up Airflow in production - Debugging failed DAG runs ## Core Concepts ### 1. DAG Design Principles | Principle | Description | |-----------|-------------| | **Idempotent** | Running twice produces same result | | **Atomic** | Tasks succeed or fail completely | | **Incremental** | Process only new/changed data | | **Observable** | Logs, metrics, alerts at every step | ### 2. Task Dependencies ```python # Linear task1 >> task2 >> task3 # Fan-out task1 >> [task2, task3, task4] # Fan-in [task1, task2, task3] >> task4 # Complex task1 >> task2 >> task4 task1 >> task3 >> task4 ``` ## Quick Start ```python # dags/example_dag.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(hours=1), } with DAG( dag_id='example_etl', default_args=default_args, description='Example ETL pipeline', schedule='0 6 * * *', # Daily at 6 AM start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'example'], max_active_runs=1, ) as dag: start = EmptyOperator(task_id='start') def extract_data(**context): execution_date = context['ds'] # Extract logic here return {'records': 1000} extract = PythonOperator( task_id='extract', python_callable=extract_data, ) end = EmptyOperator(task_id='end') start >> extract >> end ``` ## Patterns ### Pattern 1: TaskFlow API (Airflow 2.0+) ```python # dags/taskflow_example.py from datetime import datetime from airflow.decorators import dag, task from airflow.models import Variable @dag( dag_id='taskflow_etl', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'], ) def taskflow_etl(): """ETL pipeline using TaskFlow API""" @task() def extract(source: str) -> dict: """Extract data from source""" import pandas as pd df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv') return {'data': df.to_dict(), 'rows': len(df)} @task() def transform(extracted: dict) -> dict: """Transform extracted data""" import pandas as pd df = pd.DataFrame(extracted['data']) df['processed_at'] = datetime.now() df = df.dropna() return {'data': df.to_dict(), 'rows': len(df)} @task() def load(transformed: dict, target: str): """Load data to target""" import pandas as pd df = pd.DataFrame(transformed['data']) df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet') return transformed['rows'] @task() def notify(rows_loaded: int): """Send notification""" print(f'Loaded {rows_loaded} rows') # Define dependencies with XCom passing extracted = extract(source='raw_data') transformed = transform(extracted) loaded = load(transformed, target='processed_data') notify(loaded) # Instantiate the DAG taskflow_etl() ``` ### Pattern 2: Dynamic DAG Generation ```python # dags/dynamic_dag_factory.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable import json # Configuration for multiple similar pipelines PIPELINE_CONFIGS = [ {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'}, {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'}, {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'}, ] def create_dag(config: dict) -> DAG: """Factory function to create DAGs from config""" dag_id = f"etl_{config['name']}" default_args = { 'owner': 'data-team', 'retries': 3, 'retry_delay': timedelta(minutes=5), } dag = DAG( dag_id=dag_id, default_args=default_args, schedule=config['schedule'], start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'dynamic', config['name']], ) with dag: def extract_fn(source, **context): print(f"Extracting from {source} for {context['ds']}") def transform_fn(**context): print(f"Transforming data for {context['ds']}") def load_fn(table_name, **context): print(f"Loading to {table_name} for {context['ds']}") extract = PythonOperator( task_id='extract', python_callable=extract_fn, op_kwargs={'source': config['source']}, ) transform = PythonOperator( task_id='transform', python_callable=transform_fn, ) load = PythonOperator( task_id='load', python_callable=load_fn, op_kwargs={'table_name': config['name']}, ) extract >> transform >> load return dag # Generate DAGs for config in PIPELINE_CONFIGS: globals()[f"dag_{config['name']}"] = create_dag(config) ``` ### Pattern 3: Branching and Conditional Logic ```python # dags/branching_example.py from airflow.decorators import dag, task from airflow.operators.python import BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule @dag( dag_id='branching_pipeline', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) def branching_pipeline(): @task() def check_data_quality() -> dict: """Check data quality and return metrics""" quality_score = 0.95 # Simulated return {'score': quality_score, 'rows': 10000} def choose_branch(**context) -> str: """Determine which branch to execute""" ti = context['ti'] metrics = ti.xcom_pull(task_ids='check_data_quality') if metrics['score'] >= 0.9: return 'high_quality_path' elif metrics['score'] >= 0.7: return 'medium_quality_path' else: return 'low_quality_path' quality_check = check_data_quality() branch = BranchPythonOperator( task_id='branch', python_callable=choose_branch, ) high_quality = EmptyOperator(task_id='high_quality_path') medium_quality = EmptyOperator(task_id='medium_quality_path') low_quality = EmptyOperator(task_id='low_quality_path') # Join point - runs after any branch completes join = EmptyOperator( task_id='join', trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, ) quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join branching_pipeline() ``` ### Pattern 4: Sensors and External Dependencies ```python # dags/sensor_patterns.py from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.filesystem import FileSensor from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.python import PythonOperator with DAG( dag_id='sensor_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) as dag: # Wait for file on S3 wait_for_file = S3KeySensor( task_id='wait_for_s3_file', bucket_name='data-lake', bucket_key='raw/{{ ds }}/data.parquet', aws_conn_id='aws_default', timeout=60 * 60 * 2, # 2 hours poke_interval=60 * 5, # Check every 5 minutes mode='reschedule', # Free up worker slot while waiting ) # Wait for another DAG to complete wait_for_upstream = ExternalTaskSensor( task_id='wait_for_upstream_dag', external_dag_id='upstream_etl', external_task_id='final_task', execution_date_fn=lambda dt: dt, # Same execution date timeout=60 * 60 * 3, mode='reschedule', ) # Custom sensor using @task.sensor decorator @task.sensor(poke_interval=60, timeout=3600, mode='reschedule') def wait_for_api() -> PokeReturnValue: """Custom sensor for API availability""" import requests response = requests.get('https://api.example.com/health') is_done = response.status_code == 200 return PokeReturnValue(is_done=is_done, xcom_value=response.json()) api_ready = wait_for_api() def process_data(**context): api_result = context['ti'].xcom_pull(task_ids='wait_for_api') print(f"API returned: {api_result}") process = PythonOperator( task_id='process', python_callable=process_data, ) [wait_for_file, wait_for_upstream, api_ready] >> process ``` ### Pattern 5: Error Handling and Alerts ```python # dags/error_handling.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule from airflow.models import Variable def task_failure_callback(context): """Callback on task failure""" task_instance = context['task_instance'] exception = context.get('exception') # Send to Slack/PagerDuty/etc message = f""" Task Failed! DAG: {task_instance.dag_id} Task: {task_instance.task_id} Execution Date: {context['ds']} Error: {exception} Log URL: {task_instance.log_url} """ # send_slack_alert(message) print(message) def dag_failure_callback(context): """Callback on DAG failure""" # Aggregate failures, send summary pass with DAG( dag_id='error_handling_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, on_failure_callback=dag_failure_callback, default_args={ 'on_failure_callback': task_failure_callback, 'retries': 3, 'retry_delay': timedelta(minutes=5), }, ) as dag: def might_fail(**context): import random if random.random() < 0.3: raise ValueError("Random failure!") return "Success" risky_task = PythonOperator( task_id='risky_task', python_callable=might_fail, ) def cleanup(**context): """Cleanup runs regardless of upstream failures""" print("Cleaning up...") cleanup_task = PythonOperator( task_id='cleanup', python_callable=cleanup, trigger_rule=TriggerRule.ALL_DONE, # Run even if upstream fails ) def notify_success(**context): """Only runs if all upstream succeeded""" print("All tasks succeeded!") success_notification = PythonOperator( task_id='notify_success', python_callable=notify_success, trigger_rule=TriggerRule.ALL_SUCCESS, ) risky_task >> [cleanup_task, success_notification] ``` ### Pattern 6: Testing DAGs ```python # tests/test_dags.py import pytest from datetime import datetime from airflow.models import DagBag @pytest.fixture def dagbag(): return DagBag(dag_folder='dags/', include_examples=False) def test_dag_loaded(dagbag): """Test that all DAGs load without errors""" assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}" def test_dag_structure(dagbag): """Test specific DAG structure""" dag = dagbag.get_dag('example_etl') assert dag is not None assert len(dag.tasks) == 3 assert dag.schedule_interval == '0 6 * * *' def test_task_dependencies(dagbag): """Test task dependencies are correct""" dag = dagbag.get_dag('example_etl') extract_task = dag.get_task('extract') assert 'start' in [t.task_id for t in extract_task.upstream_list] assert 'end' in [t.task_id for t in extract_task.downstream_list] def test_dag_integrity(dagbag): """Test DAG has no cycles and is valid""" for dag_id, dag in dagbag.dags.items(): assert dag.test_cycle() is None, f"Cycle detected in {dag_id}" # Test individual task logic def test_extract_function(): """Unit test for extract function""" from dags.example_dag import extract_data result = extract_data(ds='2024-01-01') assert 'records' in result assert isinstance(result['records'], int) ``` ## Project Structure ``` airflow/ ├── dags/ │ ├── __init__.py │ ├── common/ │ │ ├── __init__.py │ │ ├── operators.py # Custom operators │ │ ├── sensors.py # Custom sensors │ │ └── callbacks.py # Alert callbacks │ ├── etl/ │ │ ├── customers.py │ │ └── orders.py │ └── ml/ │ └── training.py ├── plugins/ │ └── custom_plugin.py ├── tests/ │ ├── __init__.py │ ├── test_dags.py │ └── test_operators.py ├── docker-compose.yml └── requirements.txt ``` ## Best Practices ### Do's - **Use TaskFlow API** - Cleaner code, automatic XCom - **Set timeouts** - Prevent zombie tasks - **Use `mode='reschedule'`** - For sensors, free up workers - **Test DAGs** - Unit tests and integration tests - **Idempotent tasks** - Safe to retry ### Don'ts - **Don't use `depends_on_past=True`** - Creates bottlenecks - **Don't hardcode dates** - Use `{{ ds }}` macros - **Don't use global state** - Tasks should be stateless - **Don't skip catchup blindly** - Understand implications - **Don't put heavy logic in DAG file** - Import from modules ## Resources - [Airflow Documentation](https://airflow.apache.org/docs/) - [Astronomer Guides](https://docs.astronomer.io/learn) - [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html)