--- name: delta-sharing description: Delta Sharing configuration, monitoring, and recipient management for secure cross-organization data sharing. triggers: - delta sharing - data sharing - external sharing - recipient management - share configuration category: collaboration --- # Delta Sharing Skill ## Overview Delta Sharing is an open protocol for secure data sharing across organizations. This skill covers share configuration, recipient management, access control, and usage monitoring. **Key Benefits:** - Secure cross-organization sharing - No data duplication - Real-time data access - Centralized access control - Usage tracking and auditing - Open standard (works with any platform) ## When to Use This Skill Use Delta Sharing when you need to: - Share data with external partners - Enable cross-workspace data access - Distribute data products to consumers - Implement multi-tenant data access - Track data consumption patterns - Maintain centralized governance ## Core Concepts ### 1. Share Configuration **Create and Configure Shares:** ```python from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Create share share = w.shares.create( name="partner_analytics_share", comment="Analytics data for external partners" ) # Add tables to share w.shares.update( name="partner_analytics_share", updates=[ { "action": "ADD", "data_object": { "name": "catalog.schema.customer_metrics", "data_object_type": "TABLE", "comment": "Aggregated customer metrics" } }, { "action": "ADD", "data_object": { "name": "catalog.schema.product_sales", "data_object_type": "TABLE" } } ] ) ``` ### 2. Recipient Management **Create and Manage Recipients:** ```python # Create recipient recipient = w.recipients.create( name="acme_corp", comment="ACME Corporation partner", authentication_type="TOKEN" ) # Get activation URL for recipient activation_url = recipient.activation_url print(f"Share this URL with recipient: {activation_url}") # Grant access to share w.grants.update( securable_type="SHARE", securable_name="partner_analytics_share", changes=[{ "principal": "acme_corp", "add": ["SELECT"] }] ) ``` ### 3. Access Control **Manage Permissions:** ```python # List current permissions permissions = w.grants.get_effective( securable_type="SHARE", full_name="partner_analytics_share" ) # Revoke access w.grants.update( securable_type="SHARE", securable_name="partner_analytics_share", changes=[{ "principal": "acme_corp", "remove": ["SELECT"] }] ) # Grant access to specific tables only w.grants.update( securable_type="TABLE", full_name="catalog.schema.customer_metrics", securable_type="SHARE", securable_name="partner_analytics_share", changes=[{ "principal": "acme_corp", "add": ["SELECT"] }] ) ``` ### 4. Usage Monitoring **Track Share Usage:** ```python def monitor_share_usage(spark, share_name: str, days: int = 7): """Monitor Delta Sharing usage.""" query = f""" SELECT date_trunc('day', request_time) as date, recipient_name, table_name, COUNT(*) as request_count, SUM(rows_returned) as total_rows, SUM(bytes_transferred) as total_bytes FROM system.access.audit WHERE share_name = '{share_name}' AND request_time >= current_date() - INTERVAL {days} DAYS GROUP BY 1, 2, 3 ORDER BY 1 DESC, 4 DESC """ return spark.sql(query) # Get usage metrics usage_df = monitor_share_usage(spark, "partner_analytics_share") usage_df.show() ``` ## Implementation Patterns ### Pattern 1: Multi-Tenant Sharing **Separate Shares per Tenant:** ```python class MultiTenantSharingManager: """Manage multi-tenant data sharing.""" def __init__(self, workspace_client: WorkspaceClient): self.client = workspace_client def setup_tenant_share( self, tenant_name: str, tables: List[str], tenant_email: str ): """Set up complete sharing for a tenant.""" share_name = f"{tenant_name}_share" # Create share self.client.shares.create( name=share_name, comment=f"Data share for {tenant_name}" ) # Add tables for table in tables: self.client.shares.update( name=share_name, updates=[{ "action": "ADD", "data_object": { "name": table, "data_object_type": "TABLE" } }] ) # Create recipient recipient = self.client.recipients.create( name=tenant_name, comment=f"Recipient for {tenant_name}", authentication_type="TOKEN" ) # Grant access self.client.grants.update( securable_type="SHARE", securable_name=share_name, changes=[{ "principal": tenant_name, "add": ["SELECT"] }] ) return { "share_name": share_name, "recipient": tenant_name, "activation_url": recipient.activation_url, "tables": tables } ``` ### Pattern 2: Row-Level Security **Implement RLS for Shared Data:** ```python import dlt from pyspark.sql.functions import * @dlt.table( name="customer_data_with_rls", comment="Customer data with row-level security" ) def customer_data_with_tenant_filter(): """ Apply row-level security for multi-tenant sharing. Each recipient sees only their tenant's data. """ return ( dlt.read("silver_customer_data") .withColumn("tenant_id", col("customer_id").substr(1, 3)) .select( "customer_id", "tenant_id", "customer_name", "email", "created_date" ) ) # Share with partition filtering def create_tenant_specific_share( tenant_id: str, source_table: str, share_name: str ): """ Create share with automatic tenant filtering. Uses partition pruning for efficient filtering. """ w = WorkspaceClient() # Create view with tenant filter view_name = f"{source_table}_{tenant_id}_view" spark.sql(f""" CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {source_table} WHERE tenant_id = '{tenant_id}' """) # Share the view w.shares.create(name=share_name) w.shares.update( name=share_name, updates=[{ "action": "ADD", "data_object": { "name": view_name, "data_object_type": "VIEW" } }] ) ``` ### Pattern 3: Usage Analytics **Comprehensive Usage Tracking:** ```python class SharingAnalytics: """Analyze Delta Sharing usage patterns.""" def __init__(self, spark): self.spark = spark def get_most_accessed_tables(self, days: int = 30): """Find most accessed shared tables.""" return self.spark.sql(f""" SELECT table_name, COUNT(DISTINCT recipient_name) as unique_recipients, COUNT(*) as total_requests, SUM(rows_returned) as total_rows, AVG(query_duration_ms) as avg_duration_ms FROM system.access.audit WHERE event_type = 'deltaSharing' AND request_time >= current_date() - INTERVAL {days} DAYS GROUP BY table_name ORDER BY total_requests DESC LIMIT 20 """) def get_recipient_activity(self, recipient_name: str, days: int = 7): """Detailed activity for specific recipient.""" return self.spark.sql(f""" SELECT date_trunc('hour', request_time) as hour, table_name, COUNT(*) as requests, SUM(rows_returned) as rows, SUM(bytes_transferred) / 1024 / 1024 as mb_transferred FROM system.access.audit WHERE recipient_name = '{recipient_name}' AND request_time >= current_date() - INTERVAL {days} DAYS GROUP BY 1, 2 ORDER BY 1 DESC """) def detect_anomalies(self, share_name: str): """Detect unusual access patterns.""" return self.spark.sql(f""" WITH daily_stats AS ( SELECT date_trunc('day', request_time) as date, recipient_name, COUNT(*) as daily_requests FROM system.access.audit WHERE share_name = '{share_name}' AND request_time >= current_date() - INTERVAL 30 DAYS GROUP BY 1, 2 ), stats AS ( SELECT recipient_name, AVG(daily_requests) as avg_requests, STDDEV(daily_requests) as stddev_requests FROM daily_stats GROUP BY recipient_name ) SELECT d.date, d.recipient_name, d.daily_requests, s.avg_requests, s.stddev_requests, CASE WHEN d.daily_requests > s.avg_requests + 2 * s.stddev_requests THEN 'ANOMALY_HIGH' WHEN d.daily_requests < s.avg_requests - 2 * s.stddev_requests THEN 'ANOMALY_LOW' ELSE 'NORMAL' END as status FROM daily_stats d JOIN stats s ON d.recipient_name = s.recipient_name WHERE date >= current_date() - INTERVAL 7 DAYS ORDER BY d.date DESC, d.daily_requests DESC """) ``` ### Pattern 4: Automated Share Management **CI/CD for Share Configuration:** ```python """ Automated share management from configuration. """ import yaml from typing import Dict, List, Any class ShareConfigManager: """Manage shares from YAML configuration.""" def __init__(self, workspace_client: WorkspaceClient): self.client = workspace_client def load_config(self, config_path: str) -> Dict[str, Any]: """Load share configuration from YAML.""" with open(config_path, 'r') as f: return yaml.safe_load(f) def apply_config(self, config: Dict[str, Any]): """Apply share configuration.""" for share_config in config.get('shares', []): self._create_or_update_share(share_config) for recipient_config in config.get('recipients', []): self._create_or_update_recipient(recipient_config) for grant_config in config.get('grants', []): self._apply_grants(grant_config) def _create_or_update_share(self, config: Dict[str, Any]): """Create or update a share.""" try: existing = self.client.shares.get(name=config['name']) print(f"Share {config['name']} already exists") except: self.client.shares.create( name=config['name'], comment=config.get('comment', '') ) print(f"Created share: {config['name']}") # Add tables for table in config.get('tables', []): self.client.shares.update( name=config['name'], updates=[{ "action": "ADD", "data_object": { "name": table, "data_object_type": "TABLE" } }] ) ``` **Example Configuration (shares.yaml):** ```yaml shares: - name: partner_analytics_share comment: Analytics data for partners tables: - catalog.analytics.customer_metrics - catalog.analytics.product_performance - catalog.analytics.sales_summary recipients: - name: acme_corp comment: ACME Corporation authentication_type: TOKEN email: data-team@acme.com - name: globex_inc comment: Globex Inc authentication_type: TOKEN email: analytics@globex.com grants: - share: partner_analytics_share recipient: acme_corp permissions: [SELECT] - share: partner_analytics_share recipient: globex_inc permissions: [SELECT] ``` ## Best Practices ### 1. Security - Use token-based authentication - Implement row-level security when needed - Regular audit of access permissions - Monitor for unusual access patterns - Rotate recipient tokens periodically ### 2. Performance - Share aggregated data when possible - Use partition pruning for large tables - Monitor query performance - Implement caching strategies - Limit result set sizes ### 3. Governance - Document all shares and recipients - Maintain share configuration in version control - Regular access reviews - Clear data ownership - Defined SLAs for shared data ### 4. Monitoring ```python # Set up alerts for anomalous usage def check_usage_threshold(spark, share_name: str, threshold_gb: float = 100): """Alert if daily data transfer exceeds threshold.""" query = f""" SELECT SUM(bytes_transferred) / 1024 / 1024 / 1024 as gb_transferred FROM system.access.audit WHERE share_name = '{share_name}' AND date_trunc('day', request_time) = current_date() """ result = spark.sql(query).first() gb_transferred = result['gb_transferred'] or 0 if gb_transferred > threshold_gb: send_alert( f"Share {share_name} transferred {gb_transferred:.2f}GB today, " f"exceeding threshold of {threshold_gb}GB" ) ``` ## Common Pitfalls to Avoid Don't: - Share raw PII without review - Neglect access auditing - Skip documentation - Ignore usage monitoring - Hard-code credentials Do: - Implement data masking for sensitive fields - Regular access audits - Document all shares - Monitor usage patterns - Use secure credential management ## Complete Examples See `/examples/` directory for: - `external_data_sharing.py`: Complete external sharing setup - `multi_tenant_sharing.py`: Multi-tenant implementation ## Related Skills - `data-products`: Share data products - `data-quality`: Ensure shared data quality - `medallion-architecture`: Share gold layer tables - `cicd-workflows`: Automate share management ## References - [Delta Sharing Protocol](https://github.com/delta-io/delta-sharing) - [Databricks Delta Sharing Docs](https://docs.databricks.com/data-sharing/index.html) - [Delta Sharing Security](https://docs.databricks.com/security/delta-sharing.html)