--- name: alembic version: 1.0.0 description: Comprehensive Alembic database migration management for customer support systems tags: - alembic - database - migrations - sqlalchemy - postgresql - customer-support - schema-evolution - data-migrations categories: - database - backend - devops context: - customer support operations - ticket management systems - user data management - schema versioning - production deployments dependencies: - alembic>=1.13.0 - sqlalchemy>=2.0.0 - psycopg2-binary>=2.9.0 - pytest>=7.0.0 author: Customer Support Tech Enablement Team --- # Alembic Database Migration Management Skill ## Overview This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations. ## Core Concepts ### What is Alembic? Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means: - **Version Control**: Track all schema changes in your support database - **Reproducibility**: Apply the same migrations across dev, staging, and production - **Rollback Capability**: Safely revert problematic changes - **Team Collaboration**: Merge schema changes from multiple developers - **Data Preservation**: Migrate data during schema transformations ### Migration Lifecycle in Support Systems 1. **Development**: Create migrations locally while developing new features 2. **Testing**: Validate migrations in staging environment 3. **Review**: Code review migration scripts before production 4. **Deployment**: Apply migrations to production with minimal downtime 5. **Monitoring**: Track migration status and handle failures 6. **Rollback**: Revert if issues arise in production ## Installation and Initial Setup ### Installing Alembic ```bash # Install Alembic with PostgreSQL support pip install alembic psycopg2-binary sqlalchemy # Or add to requirements.txt alembic>=1.13.0 sqlalchemy>=2.0.0 psycopg2-binary>=2.9.0 ``` ### Initialize Alembic in Your Project ```bash # Initialize Alembic (creates alembic/ directory and alembic.ini) alembic init alembic # For multiple database support alembic init --template multidb alembic ``` This creates: - `alembic/`: Directory containing migration scripts - `alembic/versions/`: Where individual migration files live - `alembic/env.py`: Migration environment configuration - `alembic.ini`: Alembic configuration file ### Configure Database Connection Edit `alembic.ini` to set your database URL: ```ini # For development sqlalchemy.url = postgresql://user:password@localhost/support_dev # For production (use environment variables) sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s ``` Better approach - use environment variables in `env.py`: ```python import os from logging.config import fileConfig from sqlalchemy import engine_from_config, pool from alembic import context # Import your models from myapp.models import Base # This is the Alembic Config object config = context.config # Override sqlalchemy.url from environment db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev') config.set_main_option('sqlalchemy.url', db_url) # Set up target metadata for autogenerate target_metadata = Base.metadata ``` ## Creating Migrations ### Manual Migration Creation Create a migration manually when you need precise control: ```bash # Create empty migration file alembic revision -m "add ticket priority column" ``` This generates a file like `versions/abc123_add_ticket_priority_column.py`: ```python """add ticket priority column Revision ID: abc123 Revises: def456 Create Date: 2025-01-15 10:30:00.000000 """ from alembic import op import sqlalchemy as sa # revision identifiers revision = 'abc123' down_revision = 'def456' branch_labels = None depends_on = None def upgrade() -> None: # Add priority column to tickets table op.add_column('tickets', sa.Column('priority', sa.String(20), nullable=True, server_default='normal') ) # Create index for performance op.create_index('ix_tickets_priority', 'tickets', ['priority']) def downgrade() -> None: # Remove index first op.drop_index('ix_tickets_priority', 'tickets') # Remove column op.drop_column('tickets', 'priority') ``` ### Autogenerate Migrations Let Alembic detect schema changes automatically: ```bash # Generate migration by comparing models to database alembic revision --autogenerate -m "add customer satisfaction table" ``` **Important**: Always review autogenerated migrations! They may miss: - Renamed columns (appears as drop + add) - Changed column types requiring data conversion - Complex constraints - Data migrations Example autogenerated migration: ```python """add customer satisfaction table Revision ID: xyz789 Revises: abc123 Create Date: 2025-01-15 11:00:00.000000 """ from alembic import op import sqlalchemy as sa revision = 'xyz789' down_revision = 'abc123' branch_labels = None depends_on = None def upgrade() -> None: # Auto-generated - review before running! op.create_table( 'customer_satisfaction', sa.Column('id', sa.Integer(), nullable=False), sa.Column('ticket_id', sa.Integer(), nullable=False), sa.Column('rating', sa.Integer(), nullable=False), sa.Column('feedback', sa.Text(), nullable=True), sa.Column('created_at', sa.DateTime(), nullable=False), sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'), sa.PrimaryKeyConstraint('id') ) op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id']) op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at']) def downgrade() -> None: op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction') op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction') op.drop_table('customer_satisfaction') ``` ## Data Migrations ### Migrating Data During Schema Changes When you need to transform existing data: ```python """convert ticket status to new enum Revision ID: data001 Revises: xyz789 Create Date: 2025-01-15 12:00:00.000000 """ from alembic import op import sqlalchemy as sa from sqlalchemy.sql import table, column revision = 'data001' down_revision = 'xyz789' def upgrade() -> None: # Create new status column op.add_column('tickets', sa.Column('status_new', sa.String(50), nullable=True) ) # Migrate data using bulk update tickets = table('tickets', column('status', sa.String), column('status_new', sa.String) ) # Map old statuses to new ones status_mapping = { 'open': 'OPEN', 'in_progress': 'IN_PROGRESS', 'pending': 'WAITING_ON_CUSTOMER', 'resolved': 'RESOLVED', 'closed': 'CLOSED' } connection = op.get_bind() for old_status, new_status in status_mapping.items(): connection.execute( tickets.update().where( tickets.c.status == old_status ).values(status_new=new_status) ) # Make new column non-nullable now that data is migrated op.alter_column('tickets', 'status_new', nullable=False) # Drop old column and rename new one op.drop_column('tickets', 'status') op.alter_column('tickets', 'status_new', new_column_name='status') def downgrade() -> None: # Reverse the migration op.add_column('tickets', sa.Column('status_old', sa.String(50), nullable=True) ) tickets = table('tickets', column('status', sa.String), column('status_old', sa.String) ) # Reverse mapping reverse_mapping = { 'OPEN': 'open', 'IN_PROGRESS': 'in_progress', 'WAITING_ON_CUSTOMER': 'pending', 'RESOLVED': 'resolved', 'CLOSED': 'closed' } connection = op.get_bind() for new_status, old_status in reverse_mapping.items(): connection.execute( tickets.update().where( tickets.c.status == new_status ).values(status_old=old_status) ) op.alter_column('tickets', 'status_old', nullable=False) op.drop_column('tickets', 'status') op.alter_column('tickets', 'status_old', new_column_name='status') ``` ### Large Data Migrations with Batching For large tables, process data in batches: ```python """add computed resolution time to tickets Revision ID: data002 Revises: data001 Create Date: 2025-01-15 13:00:00.000000 """ from alembic import op import sqlalchemy as sa from sqlalchemy.sql import table, column, select revision = 'data002' down_revision = 'data001' def upgrade() -> None: # Add new column op.add_column('tickets', sa.Column('resolution_time_seconds', sa.Integer(), nullable=True) ) connection = op.get_bind() tickets = table('tickets', column('id', sa.Integer), column('created_at', sa.DateTime), column('resolved_at', sa.DateTime), column('resolution_time_seconds', sa.Integer) ) # Process in batches to avoid memory issues batch_size = 1000 offset = 0 while True: # Get batch of tickets that need processing batch = connection.execute( select( tickets.c.id, tickets.c.created_at, tickets.c.resolved_at ).where( sa.and_( tickets.c.resolved_at.isnot(None), tickets.c.resolution_time_seconds.is_(None) ) ).limit(batch_size).offset(offset) ).fetchall() if not batch: break # Update batch for row in batch: if row.resolved_at and row.created_at: resolution_time = (row.resolved_at - row.created_at).total_seconds() connection.execute( tickets.update().where( tickets.c.id == row.id ).values(resolution_time_seconds=int(resolution_time)) ) offset += batch_size # Now make column non-nullable for future rows op.alter_column('tickets', 'resolution_time_seconds', nullable=False, server_default='0') def downgrade() -> None: op.drop_column('tickets', 'resolution_time_seconds') ``` ## Running Migrations ### Upgrade Database to Latest ```bash # Upgrade to latest revision (head) alembic upgrade head # See what would be executed (SQL only, don't run) alembic upgrade head --sql # Upgrade one step at a time alembic upgrade +1 # Upgrade to specific revision alembic upgrade abc123 ``` ### Downgrade Database ```bash # Downgrade one revision alembic downgrade -1 # Downgrade to specific revision alembic downgrade abc123 # Downgrade to base (empty database) alembic downgrade base # Generate SQL for downgrade without executing alembic downgrade -1 --sql ``` ### Check Current Status ```bash # Show current database revision alembic current # Show current revision with details alembic current --verbose # Show migration history alembic history # Show history with current revision marked alembic history --indicate-current # Show specific revision range alembic history -r base:head ``` ## Branching and Merging ### Why Branch Migrations? In customer support systems, you might have: - **Feature branches**: New features developed in parallel - **Hotfix branches**: Urgent fixes that can't wait for feature completion - **Team branches**: Multiple teams working on different modules ### Creating a Branch ```bash # Create base for new branch alembic revision -m "create reporting branch" \ --head=base \ --branch-label=reporting \ --version-path=alembic/versions/reporting # Add migration to specific branch alembic revision -m "add report tables" \ --head=reporting@head ``` Example branch structure: ``` base ├── main branch │ ├── abc123: initial schema │ ├── def456: add tickets │ └── ghi789: add users └── reporting branch ├── rep001: create reports table └── rep002: add scheduled reports ``` ### Working with Multiple Branches ```bash # Show all branch heads alembic heads # Show branch points alembic branches # Upgrade specific branch alembic upgrade reporting@head # Upgrade all branches alembic upgrade heads ``` ### Merging Branches When features are ready to merge: ```bash # Merge two branches alembic merge -m "merge reporting into main" \ main@head reporting@head ``` Generated merge migration: ```python """merge reporting into main Revision ID: merge001 Revises: ghi789, rep002 Create Date: 2025-01-15 14:00:00.000000 """ from alembic import op import sqlalchemy as sa revision = 'merge001' down_revision = ('ghi789', 'rep002') # Multiple parents branch_labels = None depends_on = None def upgrade() -> None: # Usually empty for simple merges # Add code if you need to reconcile conflicting changes pass def downgrade() -> None: pass ``` ### Cross-Branch Dependencies When one branch depends on another: ```bash # Create migration that depends on specific revision from another branch alembic revision -m "reporting needs user table" \ --head=reporting@head \ --depends-on=def456 # Revision from main branch ``` ## Testing Migrations ### Unit Testing Migrations ```python # tests/test_migrations.py import pytest from alembic import command from alembic.config import Config from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker @pytest.fixture def alembic_config(): """Provide Alembic configuration for testing""" config = Config("alembic.ini") config.set_main_option( "sqlalchemy.url", "postgresql://localhost/support_test" ) return config @pytest.fixture def test_db(alembic_config): """Create test database and apply migrations""" # Create engine engine = create_engine( alembic_config.get_main_option("sqlalchemy.url") ) # Run migrations to head command.upgrade(alembic_config, "head") yield engine # Cleanup - downgrade to base command.downgrade(alembic_config, "base") engine.dispose() def test_migration_creates_tickets_table(test_db): """Test that migrations create expected tables""" inspector = inspect(test_db) tables = inspector.get_table_names() assert 'tickets' in tables assert 'users' in tables assert 'customer_satisfaction' in tables def test_tickets_table_structure(test_db): """Test ticket table has correct columns""" inspector = inspect(test_db) columns = {col['name']: col for col in inspector.get_columns('tickets')} assert 'id' in columns assert 'priority' in columns assert 'status' in columns assert 'created_at' in columns assert 'resolution_time_seconds' in columns # Check column types assert columns['priority']['type'].python_type == str assert columns['status']['type'].python_type == str def test_migration_upgrade_downgrade_cycle(alembic_config): """Test that upgrade -> downgrade -> upgrade works""" # Start at base command.downgrade(alembic_config, "base") # Upgrade to head command.upgrade(alembic_config, "head") # Downgrade one step command.downgrade(alembic_config, "-1") # Upgrade back to head command.upgrade(alembic_config, "head") # Should complete without errors def test_data_migration_preserves_data(test_db): """Test that data migrations don't lose data""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket Session = sessionmaker(bind=test_db) session = Session() # Insert test data ticket = Ticket( title="Test ticket", status="OPEN", priority="high" ) session.add(ticket) session.commit() ticket_id = ticket.id session.close() # Run a migration that modifies tickets table # (This would be a specific revision) # command.upgrade(alembic_config, "specific_revision") # Verify data still exists session = Session() retrieved = session.query(Ticket).filter_by(id=ticket_id).first() assert retrieved is not None assert retrieved.title == "Test ticket" session.close() ``` ### Integration Testing ```python # tests/test_migration_integration.py import pytest from alembic import command from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext def test_no_pending_migrations(alembic_config, test_db): """Ensure all migrations are applied in test environment""" script = ScriptDirectory.from_config(alembic_config) with test_db.connect() as connection: context = MigrationContext.configure(connection) current_heads = set(context.get_current_heads()) script_heads = set(script.get_heads()) assert current_heads == script_heads, \ f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}" def test_migration_order_is_valid(alembic_config): """Verify migration chain has no gaps or conflicts""" script = ScriptDirectory.from_config(alembic_config) # Get all revisions revisions = list(script.walk_revisions()) # Check each revision has valid down_revision for revision in revisions: if revision.down_revision is not None: if isinstance(revision.down_revision, tuple): # Merge point for down_rev in revision.down_revision: assert script.get_revision(down_rev) is not None else: assert script.get_revision(revision.down_revision) is not None def test_check_command_detects_drift(alembic_config, test_db): """Test that check command detects schema drift""" # This test verifies that `alembic check` works correctly try: command.check(alembic_config) # If no exception, database matches models assert True except Exception as e: # If exception, there's drift between DB and models pytest.fail(f"Schema drift detected: {e}") ``` ### Testing Migration Performance ```python # tests/test_migration_performance.py import time import pytest from alembic import command def test_migration_completes_within_time_limit(alembic_config): """Ensure migrations complete within acceptable time""" # Downgrade to base command.downgrade(alembic_config, "base") # Time the upgrade start = time.time() command.upgrade(alembic_config, "head") duration = time.time() - start # Assert completes within 60 seconds assert duration < 60, f"Migration took {duration}s, exceeds 60s limit" @pytest.mark.slow def test_data_migration_with_large_dataset(alembic_config, test_db): """Test data migration performance with realistic data volume""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket Session = sessionmaker(bind=test_db) session = Session() # Create 10,000 test tickets tickets = [ Ticket( title=f"Test ticket {i}", status="OPEN", priority="normal" ) for i in range(10000) ] session.bulk_save_objects(tickets) session.commit() session.close() # Run data migration and measure time start = time.time() command.upgrade(alembic_config, "data002") # Specific data migration duration = time.time() - start # Should process 10k records in reasonable time assert duration < 30, f"Data migration took {duration}s for 10k records" ``` ## CI/CD Integration ### GitHub Actions Workflow ```yaml # .github/workflows/migrations.yml name: Database Migrations on: pull_request: paths: - 'alembic/versions/**' - 'myapp/models/**' - 'alembic.ini' - 'alembic/env.py' push: branches: - main - develop jobs: test-migrations: runs-on: ubuntu-latest services: postgres: image: postgres:15 env: POSTGRES_PASSWORD: postgres POSTGRES_DB: support_test options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 ports: - 5432:5432 steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | pip install -r requirements.txt pip install pytest pytest-cov - name: Run migration tests env: DATABASE_URL: postgresql://postgres:postgres@localhost/support_test run: | # Test upgrade to head alembic upgrade head # Test downgrade to base alembic downgrade base # Test upgrade again alembic upgrade head # Run pytest for migration tests pytest tests/test_migrations.py -v - name: Check for schema drift env: DATABASE_URL: postgresql://postgres:postgres@localhost/support_test run: | alembic check - name: Validate migration history run: | # Check for multiple heads (should be only one) HEADS_COUNT=$(alembic heads | wc -l) if [ "$HEADS_COUNT" -gt 1 ]; then echo "ERROR: Multiple heads detected. Please merge branches." alembic heads exit 1 fi review-migration-sql: runs-on: ubuntu-latest if: github.event_name == 'pull_request' steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements.txt - name: Generate SQL for review run: | # Generate SQL without executing alembic upgrade head --sql > migration.sql - name: Upload SQL artifact uses: actions/upload-artifact@v3 with: name: migration-sql path: migration.sql - name: Comment PR with SQL uses: actions/github-script@v6 with: script: | const fs = require('fs'); const sql = fs.readFileSync('migration.sql', 'utf8'); github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\`` }); ``` ### Deployment Script ```bash #!/bin/bash # scripts/deploy_migrations.sh set -e # Exit on error echo "Starting database migration deployment..." # Environment variables DB_HOST="${DB_HOST:-localhost}" DB_NAME="${DB_NAME:-support_prod}" DB_USER="${DB_USER:-postgres}" DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}" # Configuration BACKUP_DIR="./backups" TIMESTAMP=$(date +%Y%m%d_%H%M%S) BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql" # Create backup directory mkdir -p "$BACKUP_DIR" # 1. Backup database before migration echo "Creating database backup..." pg_dump "$DATABASE_URL" > "$BACKUP_FILE" echo "Backup created: $BACKUP_FILE" # 2. Check current migration status echo "Current migration status:" alembic current # 3. Show pending migrations echo "Pending migrations:" alembic history --verbose | grep -A 5 "head" # 4. Run migrations with timeout echo "Running migrations..." timeout 300 alembic upgrade head || { echo "ERROR: Migration failed or timed out!" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 } # 5. Verify migration success echo "Verifying migration status..." CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}') HEAD_REV=$(alembic heads | awk '{print $1}') if [ "$CURRENT_REV" != "$HEAD_REV" ]; then echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 fi echo "Migration completed successfully!" echo "Current revision: $CURRENT_REV" # 6. Cleanup old backups (keep last 10) echo "Cleaning up old backups..." ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm echo "Deployment complete!" ``` ## Production Best Practices ### Pre-Deployment Checklist - [ ] Migration tested in development environment - [ ] Migration tested in staging with production-like data - [ ] Migration reviewed by at least one team member - [ ] Downgrade path tested and verified - [ ] Performance impact assessed for large tables - [ ] Database backup plan in place - [ ] Rollback procedure documented - [ ] Maintenance window scheduled (if needed) - [ ] Team notified of deployment - [ ] Monitoring alerts configured ### Zero-Downtime Migrations For critical support systems that can't go offline: **Phase 1: Additive Changes** ```python """add new column (phase 1) Revision ID: zd001 """ def upgrade() -> None: # Add new column as nullable op.add_column('tickets', sa.Column('new_field', sa.String(100), nullable=True) ) def downgrade() -> None: op.drop_column('tickets', 'new_field') ``` **Phase 2: Data Migration (Background)** ```python """populate new column (phase 2) Revision ID: zd002 """ def upgrade() -> None: # Update in small batches during low-traffic periods connection = op.get_bind() batch_size = 100 while True: result = connection.execute( """ UPDATE tickets SET new_field = calculate_value(old_field) WHERE new_field IS NULL LIMIT {batch_size} """.format(batch_size=batch_size) ) if result.rowcount == 0: break # Small delay to reduce database load import time time.sleep(0.1) def downgrade() -> None: connection = op.get_bind() connection.execute("UPDATE tickets SET new_field = NULL") ``` **Phase 3: Make Required** ```python """make new column required (phase 3) Revision ID: zd003 """ def upgrade() -> None: # Now that all rows have values, make it non-nullable op.alter_column('tickets', 'new_field', nullable=False, server_default='default_value' ) def downgrade() -> None: op.alter_column('tickets', 'new_field', nullable=True, server_default=None ) ``` **Phase 4: Remove Old Column (Optional)** ```python """remove old column (phase 4) Revision ID: zd004 """ def upgrade() -> None: op.drop_column('tickets', 'old_field') def downgrade() -> None: op.add_column('tickets', sa.Column('old_field', sa.String(100), nullable=True) ) ``` ### Handling Migration Failures ```python # alembic/env.py additions for error handling from alembic import context import logging logger = logging.getLogger('alembic.env') def run_migrations_online(): """Run migrations in 'online' mode with error handling""" connectable = engine_from_config( config.get_section(config.config_ini_section), prefix='sqlalchemy.', poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata, transaction_per_migration=True, # Rollback individual migrations compare_type=True, compare_server_default=True ) try: with context.begin_transaction(): context.run_migrations() except Exception as e: logger.error(f"Migration failed: {e}") logger.error("Rolling back transaction...") # Transaction automatically rolled back raise else: logger.info("Migration completed successfully") ``` ## Advanced Configuration ### Custom Migration Template Create custom template for your organization: ```python # alembic/script.py.mako """${message} Revision ID: ${up_revision} Revises: ${down_revision | comma,n} Create Date: ${create_date} Author: ${author if author else 'Support Team'} Jira: ${jira_ticket if jira_ticket else 'N/A'} """ from alembic import op import sqlalchemy as sa ${imports if imports else ""} # revision identifiers, used by Alembic. revision = ${repr(up_revision)} down_revision = ${repr(down_revision)} branch_labels = ${repr(branch_labels)} depends_on = ${repr(depends_on)} def upgrade() -> None: """Apply migration changes""" ${upgrades if upgrades else "pass"} def downgrade() -> None: """Revert migration changes""" ${downgrades if downgrades else "pass"} ``` ### Multi-Database Support For systems with separate databases (e.g., main DB + analytics): ```python # alembic/env.py for multiple databases def run_migrations_online(): """Run migrations for multiple databases""" # Configuration for each database engines = { 'main': { 'url': os.getenv('MAIN_DB_URL'), 'target_metadata': main_metadata }, 'analytics': { 'url': os.getenv('ANALYTICS_DB_URL'), 'target_metadata': analytics_metadata } } for name, config in engines.items(): logger.info(f"Running migrations for {name} database") engine = create_engine(config['url']) with engine.connect() as connection: context.configure( connection=connection, target_metadata=config['target_metadata'], upgrade_token=f"{name}_upgrade", downgrade_token=f"{name}_downgrade" ) with context.begin_transaction(): context.run_migrations(engine_name=name) ``` ## Troubleshooting ### Common Issues and Solutions **Multiple Heads Error** ```bash # Problem: "Multiple heads exist" # Solution: Merge the branches alembic merge heads -m "merge branches" ``` **Migration Out of Sync** ```bash # Problem: Database revision doesn't match migration history # Solution: Stamp database to specific revision alembic stamp head # Or stamp to specific revision alembic stamp abc123 ``` **Failed Migration Cleanup** ```bash # Problem: Migration failed midway # Solution: Manual cleanup # 1. Check current state alembic current # 2. Manually fix database issues psql $DATABASE_URL # 3. Stamp to correct revision alembic stamp previous_working_revision # 4. Try migration again alembic upgrade head ``` **Circular Dependencies** ```bash # Problem: "Circular dependency detected" # Solution: Use depends_on instead of down_revision alembic revision -m "fix circular dependency" \ --head=branch_a@head \ --depends-on=branch_b_revision ``` ## Summary This skill covered comprehensive Alembic usage for customer support systems: 1. **Setup**: Installation, configuration, and initialization 2. **Creating Migrations**: Manual and autogenerated approaches 3. **Data Migrations**: Transforming data during schema changes 4. **Running Migrations**: Upgrade, downgrade, and status commands 5. **Branching**: Managing parallel development streams 6. **Testing**: Unit, integration, and performance testing 7. **CI/CD**: Automation and deployment strategies 8. **Production**: Zero-downtime migrations and best practices 9. **Advanced**: Custom templates and multi-database support 10. **Troubleshooting**: Common issues and solutions Always remember: - Review autogenerated migrations - Test migrations thoroughly before production - Keep backups before major migrations - Plan for rollback scenarios - Monitor migration performance - Document complex migrations For more examples, see EXAMPLES.md in this skill package.