FILE: _proc/00_db_host.html.md # 🏠 Multi-Tenant Setup ## 🎯 Overview The **Host Database** is the backbone of a multi-tenant architecture. It answers the critical questions:
Question Model Purpose
👤 Who is this person? GlobalUser Identity & authentication
🏢 Where is their data? TenantCatalog Database routing
🔑 What can they access? Membership Access control
💳 Are they paying? Subscription Billing status
📋 What happened? HostAuditLog Security audit trail
⚙️ Background work? SystemJob Async operations
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────┐ │ 🏠 HOST DATABASE │ │ (Single source of truth) │ ├─────────────────────────────────────┤ │ 👤 GlobalUser → Identity │ │ 🏢 TenantCatalog → DB Routing │ │ 🔑 Membership → Access Control │ │ 💳 Subscription → Billing │ └───────────┬─────────────────────────┘ │ ┌───────────────────┼───────────────────┐ ▼ ▼ ▼ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ 🗄️ Tenant A │ │ 🗄️ Tenant B │ │ 🗄️ Tenant C │ │ Database │ │ Database │ │ Database │ │ (isolated) │ │ (isolated) │ │ (isolated) │ └───────────────┘ └───────────────┘ └───────────────┘ **Key Principle:** User authenticates once → Host routes to correct tenant → Tenant data is isolated ## 🛠️ Utilities Helper functions used throughout the host database operations. ------------------------------------------------------------------------ source ### gen_id ``` python def gen_id( ): ``` ------------------------------------------------------------------------ source ### timestamp ``` python def timestamp( ): ``` ------------------------------------------------------------------------ source ### get_db_uri ``` python def get_db_uri( db )->str: ``` *Extract SQLAlchemy connection URI from a Database object.* Safely renders the URL with the actual password (required for utilities like [`map_and_upsert`](https://abhisheksreesaila.github.io/fh-saas/utils_polars_mapper.html#map_and_upsert) that create new connections). **Why this is needed:** - `str(db.conn.engine.url)` masks the password as `***` - This causes “password authentication failed” errors - `render_as_string(hide_password=False)` reveals the actual password
Parameter Description
db fastsql/minidataapi Database object or HostDatabase instance
**Returns:** Full connection URI string with password **Example:** ``` python from fh_saas.db_host import get_db_uri, HostDatabase from fh_saas.utils_polars_mapper import map_and_upsert host_db = HostDatabase.from_env() db_uri = get_db_uri(host_db.db) # Now safe to use with map_and_upsert map_and_upsert(df, 'my_table', 'id', db_uri) ```
Function Description
timestamp() 🕐 Returns current UTC time in ISO format
gen_id() 🆔 Generates a unique 32-character hex ID
get_db_uri(db) 🔗 Extract full connection URI with password from Database object
------------------------------------------------------------------------ ## 📦 Core Models These dataclasses define the host database schema. Each model maps to a table with the `core_` or `sys_` prefix. ------------------------------------------------------------------------ ### StripeWebhookEvent ``` python def StripeWebhookEvent( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Idempotency: Has this webhook already been processed?* Stores processed Stripe event IDs to prevent duplicate handling. Attributes: id: Unique record ID (gen_id) event_id: Stripe event ID (e.g., evt_xxx) event_type: Stripe event type (e.g., customer.subscription.updated) status: Processing result: ‘processed’, ‘failed’, or ‘skipped_out_of_order’ payload_json: Optional JSON snapshot of the event result created_at: ISO timestamp of when the event was recorded ------------------------------------------------------------------------ source ### PricingPlan ``` python def PricingPlan( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Pricing: Available subscription tiers with Stripe price IDs.* Stores pricing configuration in database for admin-configurable tiers. Each plan can have monthly and/or yearly billing intervals. Attributes: id: Unique plan identifier (e.g., ‘basic’, ‘pro’, ‘enterprise’) name: Display name for UI (e.g., ‘Basic Plan’) description: Plan description for pricing page stripe_price_monthly: Stripe Price ID for monthly billing (price_xxx) stripe_price_yearly: Stripe Price ID for yearly billing (price_xxx) amount_monthly: Monthly price in cents (for display, e.g., 799 = $7.99) amount_yearly: Yearly price in cents (for display, e.g., 7900 = $79.00) currency: ISO currency code (default: ‘usd’) trial_days: Free trial period in days (default: 30) features: JSON array of feature keys enabled for this plan tier_level: Numeric level for feature gating (higher = more access) is_active: Whether plan is available for new subscriptions sort_order: Display order on pricing page created_at: Record creation timestamp Example: \>\>\> plan = PricingPlan( … id=‘pro’, … name=‘Pro Plan’, … stripe_price_monthly=‘price_1234’, … stripe_price_yearly=‘price_5678’, … amount_monthly=1999, … amount_yearly=19900, … tier_level=2, … features=‘\[“api_access”, “exports”, “priority_support”\]’, … ) ------------------------------------------------------------------------ source ### SystemJob ``` python def SystemJob( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Maintenance: Provisioning & Cleanups* ------------------------------------------------------------------------ source ### HostAuditLog ``` python def HostAuditLog( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Security: Who changed the system?* ------------------------------------------------------------------------ source ### Subscription ``` python def Subscription( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Billing: Are they allowed to use the app?* ------------------------------------------------------------------------ source ### Membership ``` python def Membership( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Router: Which tenants can they access?* ------------------------------------------------------------------------ source ### TenantCatalog ``` python def TenantCatalog( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Registry: Where is the database?* ------------------------------------------------------------------------ source ### GlobalUser ``` python def GlobalUser( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Identity: Who is this person?* ### 📝 Model Details
Model Table Name Primary Key Description
GlobalUser core_users id OAuth identity, email, optional password hash, Stripe customer ID
TenantCatalog core_tenants id Maps tenant ID to database URL, tracks plan tier and status
Membership core_memberships id Links users to tenants with roles (owner, admin, member)
Subscription core_subscriptions id Stripe subscription state for billing enforcement
PricingPlan core_pricing_plans id Available subscription tiers with Stripe price IDs
HostAuditLog sys_audit_logs id Immutable security log for compliance
SystemJob sys_jobs id Background task queue for provisioning, cleanup
StripeWebhookEvent core_stripe_webhook_events id Idempotency tracking for processed Stripe webhook events
------------------------------------------------------------------------ ## 🔌 HostDatabase Singleton The [`HostDatabase`](https://abhisheksreesaila.github.io/fh-saas/db_host.html#hostdatabase) class provides a **singleton** connection manager for the host database. ✅ **Why Singleton?** - Single connection pool shared across the application - Consistent transaction management - Easy dependency injection for testing ┌─────────────────────────────────────────────────────┐ │ Application Start │ └─────────────────────┬───────────────────────────────┘ ▼ ┌────────────────────────┐ │ HostDatabase.from_env() │ ← Reads DB_* env vars └────────────┬───────────┘ ▼ ┌────────────────────────┐ │ Creates tables if │ │ they don't exist │ └────────────┬───────────┘ ▼ ┌────────────────────────┐ │ Returns singleton │ ← Same instance everywhere │ instance │ └────────────────────────┘ ------------------------------------------------------------------------ source ### HostDatabase ``` python def HostDatabase( db_url:str=None ): ``` *Singleton connection manager for the host database.* ### 🔧 Methods
Method Description
from_env() 🏭 Factory method - creates instance from environment variables
commit() ✅ Commit the current database transaction
rollback() ↩︎️ Rollback the current transaction on error
reset_instance() 🧪 Reset singleton (testing only)
#### 🌍 Environment Variables for `from_env()`
Variable Default Description
DB_TYPE POSTGRESQL Database type (POSTGRESQL or SQLITE)
DB_USER postgres Database username
DB_PASS (required) Database password
DB_HOST localhost Database host
DB_PORT 5432 Database port
DB_NAME app_host Database name
------------------------------------------------------------------------ source ### HostDatabase.from_env ``` python def from_env( ): ``` *Create HostDatabase from DB\_* environment variables.\* ------------------------------------------------------------------------ source ### HostDatabase.commit ``` python def commit( ): ``` *Commit current transaction.* ------------------------------------------------------------------------ source ### HostDatabase.rollback ``` python def rollback( ): ``` *Rollback current transaction.* ------------------------------------------------------------------------ source ### HostDatabase.reset_instance ``` python def reset_instance( ): ``` *Reset singleton instance (testing only).* ⚠️ Call close() first to release database connections! ------------------------------------------------------------------------ ## 🚀 Quick Start ``` python from fh_saas.db_host import HostDatabase, GlobalUser, gen_id, timestamp # Initialize singleton from environment host_db = HostDatabase.from_env() # Create a user user = GlobalUser( id=gen_id(), email="user@example.com", oauth_id="google_123", created_at=timestamp() ) host_db.global_users.insert(user) host_db.commit() # Query users all_users = host_db.global_users() ``` 💡 **Tip:** The singleton ensures you always get the same connection, so you can call [`HostDatabase.from_env()`](https://abhisheksreesaila.github.io/fh-saas/db_host.html#hostdatabase.from_env) anywhere in your app. ======================================== FILE: _proc/01_db_tenant.html.md # 🗄️ Tenant Databases ## 🎯 Overview Each tenant gets their own **isolated database** containing:
Model Purpose
👤 TenantUser Local user profiles linked to global identity
🔐 TenantPermission Fine-grained resource permissions
⚙️ TenantSettings Tenant-wide configuration
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ 🏠 HOST DATABASE │ │ ┌─────────────┐ │ │ │ TenantCatalog │ ──► Maps tenant_id → database URL │ │ └──────┬──────┘ │ └─────────┼───────────────────────────────────────────────────────┘ │ │ get_or_create_tenant_db(tenant_id) ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 🗄️ TENANT DATABASE │ │ (Isolated per tenant) │ ├─────────────────────────────────────────────────────────────────┤ │ 👤 TenantUser → Local profile (links to GlobalUser.id) │ │ 🔐 TenantPermission → Resource-level access control │ │ ⚙️ TenantSettings → Timezone, currency, feature flags │ │ 🪝 WebhookEvent → Idempotent webhook processing │ │ 🔑 WebhookSecret → HMAC secrets per webhook source │ ├─────────────────────────────────────────────────────────────────┤ │ 📊 [Your App Tables] → Transactions, budgets, etc. │ └─────────────────────────────────────────────────────────────────┘ **Key Principle:** Tenant data is completely isolated → No cross-tenant data leakage possible ## 🔌 Tenant Connection > ⚠️ **Naming Convention**: Use **underscores** (not hyphens) in > `tenant_id` values. > > Database names are derived from `tenant_id` (e.g., `tenant_acme_001` → > `t_tenant_acme_001_db`). PostgreSQL and SQLite identifiers work best > with alphanumeric characters and underscores. > > ✅ Good: `tenant_acme_001`, `tenant_finxplorer_prod` ❌ Avoid: > `tenant-acme-001`, `tenant.finxplorer.prod` ``` python from nbdev.showdoc import show_doc ``` [`get_or_create_tenant_db()`](https://abhisheksreesaila.github.io/fh-saas/db_tenant.html#get_or_create_tenant_db) handles the full lifecycle: 1. 🔍 **Check** if tenant exists in host database 2. 🗄️ **Create** physical database if new (PostgreSQL) 3. 📝 **Register** tenant in [`TenantCatalog`](https://abhisheksreesaila.github.io/fh-saas/db_host.html#tenantcatalog) 4. 🔌 **Return** connection to tenant database
Parameter Description
tenant_id Unique tenant identifier (from Membership)
tenant_name Optional display name (defaults to tenant_id)
**Returns:** `Database` connection to the tenant’s isolated database ------------------------------------------------------------------------ source ### get_or_create_tenant_db ``` python def get_or_create_tenant_db( tenant_id:str, tenant_name:str=None ): ``` *Get or create a tenant database connection by tenant ID.* ⚠️ IMPORTANT: Caller is responsible for closing the returned Database connection by calling `db.conn.close()` and `db.engine.dispose()` when done. ------------------------------------------------------------------------ ## 📦 Core Tenant Models These models provide the infrastructure every tenant needs. Your app-specific models (transactions, budgets, etc.) build on top of these. ------------------------------------------------------------------------ source ### TenantSettings ``` python def TenantSettings( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Tenant-wide configuration and feature flags.* ------------------------------------------------------------------------ source ### TenantPermission ``` python def TenantPermission( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Fine-grained resource permission for a tenant user.* ------------------------------------------------------------------------ source ### TenantUser ``` python def TenantUser( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Local user profile linked to GlobalUser in host database.* ### 📝 Model Details
Model Table Name Primary Key Description
TenantUser core_tenant_users id Links to GlobalUser.id, stores local role & preferences
TenantPermission core_permissions id Resource + action permissions (RBAC)
TenantSettings core_settings id Timezone, currency, feature flags
------------------------------------------------------------------------ ## 🔧 Schema Initialization [`init_tenant_core_schema()`](https://abhisheksreesaila.github.io/fh-saas/db_tenant.html#init_tenant_core_schema) creates all infrastructure tables in a tenant database: tenant_db = get_or_create_tenant_db("tenant_abc") tables = init_tenant_core_schema(tenant_db) # Access tables via returned dict tables['tenant_users'].insert(user) tables['settings'].insert(settings) **Returns:** Dictionary of table accessors for all core models ------------------------------------------------------------------------ source ### init_tenant_core_schema ``` python def init_tenant_core_schema( tenant_db:Database ): ``` *Create all core tenant tables and return table accessors.* ------------------------------------------------------------------------ ## 🚀 Quick Start ``` python from fh_saas.db_tenant import get_or_create_tenant_db, init_tenant_core_schema, TenantUser from fh_saas.db_host import gen_id, timestamp # Get or create tenant database tenant_db = get_or_create_tenant_db("tenant_abc123", "Acme Corp") # Initialize core schema tables = init_tenant_core_schema(tenant_db) # Add a tenant user (linked to GlobalUser.id) user = TenantUser( id="global_user_xyz", # Must match GlobalUser.id display_name="John Doe", local_role="admin", created_at=timestamp() ) tables['tenant_users'].insert(user) tenant_db.conn.commit() ``` 💡 **Tip:** The `id` field in [`TenantUser`](https://abhisheksreesaila.github.io/fh-saas/db_tenant.html#tenantuser) **must match** the `GlobalUser.id` from the host database to maintain identity linking. ------------------------------------------------------------------------ ## 🔐 Role-Based Access Control (RBAC) Every tenant has a **three-tier role system** for controlling access: ### Role Hierarchy
Role Level Description
admin 3 Full access to all resources and settings
editor 2 Can view and modify data, but not settings
viewer 1 Read-only access to data
### Role Assignment Rules 1. **Tenant owner** (from host `Membership.role='owner'`) → automatically gets `admin` role 2. **Other users** → assigned explicitly by admin via `TenantUser.local_role` 3. **No defaults** → admins must explicitly assign roles when inviting users ### TenantUser.local_role The `local_role` field in [`TenantUser`](https://abhisheksreesaila.github.io/fh-saas/db_tenant.html#tenantuser) stores the user’s role within the tenant: ``` python # Example: Admin adds a new user as editor new_user = TenantUser( id=global_user_id, # Must match GlobalUser.id display_name="Jane Doe", local_role="editor", # Assigned by admin created_at=timestamp() ) tables['tenant_users'].insert(new_user) ``` ### Fine-Grained Permissions (Optional) For advanced use cases, the `core_permissions` table allows resource-level control: ``` python # Example: Grant user edit access to transactions only permission = TenantPermission( id=gen_id(), user_id=tenant_user.id, resource="transactions", action="edit", granted=True, created_at=timestamp() ) tables['permissions'].insert(permission) ```
Field Description
resource What the permission applies to (e.g., “transactions”, “budgets”)
action What action is allowed (e.g., “view”, “edit”, “delete”)
granted True = allowed, False = explicitly denied
> 💡 **Tip:** Most apps only need the `local_role` field. Use > `core_permissions` when you need per-resource control (e.g., “User X > can view transactions but not budgets”). ======================================== FILE: _proc/15_utils_db.html.md # 🗄️ Table Management ## 🎯 Overview
Function Purpose
register_table Create table from dataclass model
register_tables Create multiple tables atomically
drop_table Drop table if exists
create_index Create index with dialect-specific SQL
drop_index Drop index if exists
list_tables List all tables in database
list_indexes List indexes for a table
## 📋 Table Registration
Function Purpose
register_table Create single table from dataclass
register_tables Create multiple tables atomically
drop_table Drop table if exists
**Example:** ``` python class Transaction: id: str amount: float date: str category: str = None tenant_db = get_or_create_tenant_db("tenant_123") transactions = register_table(tenant_db, Transaction, "transactions") # Use table object for CRUD transactions.insert(Transaction(id="t1", amount=100.0, date="2024-01-01")) ``` ------------------------------------------------------------------------ source ### register_table ``` python def register_table( tenant_db:Database, model_class:Type, table_name:str, pk:str='id' ): ``` *Create a table from a dataclass model if it doesn’t exist (atomic).* ------------------------------------------------------------------------ source ### register_tables ``` python def register_tables( tenant_db:Database, models:List )->Dict: ``` *Create multiple tables atomically (all succeed or all rollback).* **Parameters:** - `models`: List of tuples `[(ModelClass, table_name, pk), ...]` **Returns:** Dict mapping table names to table objects `{table_name: table_object}` **Example:** ``` python class Transaction: id: str; amount: float; date: str class Connection: id: str; provider: str; status: str tables = register_tables(tenant_db, [ (Transaction, "transactions", "id"), (Connection, "connections", "id"), ]) tables["transactions"].insert(...) ``` ------------------------------------------------------------------------ source ### drop_table ``` python def drop_table( tenant_db:Database, table_name:str )->None: ``` *Drop a table if it exists (atomic operation).* ## 📇 Index Management
Function Purpose
create_index Create index with dialect-specific SQL
create_indexes Create multiple indexes atomically
drop_index Drop index if exists
**Parameters for [`create_index`](https://abhisheksreesaila.github.io/fh-saas/utils_db.html#create_index):**
Parameter Description
table_name Name of the table
columns List of column names to index
unique If True, creates UNIQUE index (default: False)
index_name Custom name (auto-generates idx_{table}_{cols} if None)
**Example:** ``` python # Simple index create_index(tenant_db, "transactions", ["date"]) # Composite unique index create_index(tenant_db, "transactions", ["account_id", "external_id"], unique=True) # Custom name create_index(tenant_db, "transactions", ["category"], index_name="idx_txn_cat") ``` ------------------------------------------------------------------------ source ### create_index ``` python def create_index( tenant_db:Database, table_name:str, columns:List, unique:bool=False, index_name:str=None )->None: ``` *Create an index on a table if it doesn’t exist (atomic operation).* ------------------------------------------------------------------------ source ### create_indexes ``` python def create_indexes( tenant_db:Database, indexes:List )->None: ``` *Create multiple indexes atomically (all succeed or all rollback).* **Parameters:** - `indexes`: List of tuples `[(table_name, columns, unique, index_name), ...]` - `index_name` can be `None` for auto-generated names **Example:** ``` python create_indexes(tenant_db, [ ("transactions", ["date"], False, None), ("transactions", ["account_id", "external_id"], True, "idx_txn_unique"), ("connections", ["provider"], False, None), ]) ``` ------------------------------------------------------------------------ source ### drop_index ``` python def drop_index( tenant_db:Database, index_name:str, table_name:str=None )->None: ``` *Drop an index if it exists (atomic operation).* ## 🔍 Schema Introspection
Function Purpose
table_exists Check if a table exists
**Example:** ``` python if not table_exists(tenant_db, "transactions"): transactions = register_table(tenant_db, Transaction, "transactions") ``` ------------------------------------------------------------------------ source ### table_exists ``` python def table_exists( tenant_db:Database, table_name:str )->bool: ``` *Check if a table exists in the database.* ## Usage Example Complete workflow: Define dataclass models → Get tenant DB → Register tables → Create indexes ======================================== FILE: _proc/04_utils_auth.html.md # 🔐 Authentication ------------------------------------------------------------------------ ## ⏰ Sliding Session Configuration Configure session timeout behavior with sliding expiry.
Parameter Default Description
max_age 3600 Session expires after this many seconds of inactivity
sliding True Refresh session on each request
absolute_max None Optional hard limit regardless of activity (e.g., 86400 for 24h)
secure True HTTPS-only cookies in production
same_site "lax" SameSite cookie policy for CSRF protection
### How Sliding Sessions Work Current (Fixed Timeout): Login at 10:00 → Expires at 11:00 (regardless of activity) User active at 10:55 → STILL logged out at 11:00 ❌ Sliding Timeout: Login at 10:00 → Expires at 11:00 User active at 10:55 → Expires at 11:55 ✓ User active at 11:50 → Expires at 12:50 ✓ User idle for 1 hour → Logged out ✓ ------------------------------------------------------------------------ ## 🔄 Sliding Session Middleware Custom middleware that extends Starlette’s SessionMiddleware to refresh cookie `max_age` on each authenticated request. ------------------------------------------------------------------------ source ### SessionConfig ``` python def SessionConfig( max_age:int=3600, sliding:bool=True, absolute_max:int=None, secure:bool=True, same_site:str='lax', http_only:bool=True )->None: ``` *Configuration for sliding session behavior.* Attributes: max_age: Session expires after this many seconds of inactivity. Default: 3600 (1 hour) sliding: If True, refresh session expiry on each request. Default: True absolute_max: Optional hard limit in seconds regardless of activity. Default: None secure: If True, cookie only sent over HTTPS. Default: True same_site: SameSite cookie policy (‘lax’, ‘strict’, ‘none’). Default: ‘lax’ http_only: If True, cookie not accessible via JavaScript. Default: True Example: \>\>\> config = SessionConfig() \# 1 hour inactivity timeout, sliding enabled \>\>\> config = SessionConfig(max_age=1800) \# 30 min inactivity timeout \>\>\> config = SessionConfig(max_age=3600, absolute_max=86400) \# 1h sliding, 24h hard limit ------------------------------------------------------------------------ source ### SlidingSessionMiddleware ``` python def SlidingSessionMiddleware( app, secret_key:str, session_config:SessionConfig=None, session_cookie:str='session', path:str='/' ): ``` *Session middleware with sliding expiry.* Extends Starlette’s SessionMiddleware to refresh the session cookie max_age on each request, implementing sliding session expiry. Sessions expire after `max_age` seconds of INACTIVITY, not from login time. Args: app: ASGI application secret_key: Secret key for signing cookies session_config: SessionConfig instance for timeout settings session_cookie: Cookie name. Default: ‘session’ path: Cookie path. Default: ‘/’ Example: \>\>\> from starlette.applications import Starlette \>\>\> app = Starlette() \>\>\> config = SessionConfig(max_age=3600) \# 1 hour inactivity \>\>\> app = SlidingSessionMiddleware(app, secret_key=‘…’, session_config=config) ------------------------------------------------------------------------ source ### create_session_middleware ``` python def create_session_middleware( secret_key:str, session_config:SessionConfig=None, session_cookie:str='session' )->SlidingSessionMiddleware: ``` *Factory to create SlidingSessionMiddleware for FastHTML apps.* Args: secret_key: Secret key for signing session cookies (required) session_config: SessionConfig instance. Default: SessionConfig.default() session_cookie: Cookie name. Default: ‘session’ Returns: Configured SlidingSessionMiddleware instance Example: \>\>\> from fasthtml.common import FastHTML \>\>\> \>\>\> \# Create app WITHOUT default session middleware \>\>\> app = FastHTML(sess_cls=None) \# Disable default sessions \>\>\> \>\>\> \# Add sliding session middleware \>\>\> config = SessionConfig(max_age=3600) \# 1 hour inactivity \>\>\> app = create_session_middleware(‘your-secret-key’, config)(app) \>\>\> \>\>\> \# Or wrap during app creation (recommended) \>\>\> middleware = create_session_middleware(‘your-secret-key’, config) \>\>\> \# Then configure FastHTML to use it Note: FastHTML’s default SessionMiddleware needs to be disabled first. See integration documentation for patterns. ## 🎯 Overview
Category Functions Purpose
⏰ Sliding Sessions SessionConfig, SlidingSessionMiddleware, create_session_middleware Sliding session expiry (inactivity timeout)
🛡️ Beforeware create_auth_beforeware Protect routes, auto-setup tenant DB
🔑 OAuth Client get_google_oauth_client Initialize Google OAuth
🔒 CSRF generate_oauth_state, verify_oauth_state Prevent session hijacking
👤 Users create_or_get_global_user, get_user_membership, verify_membership User & membership management
🏗️ Provisioning provision_new_user Auto-create tenant for new users
📋 Session create_user_session, get_current_user, clear_session Session management
🚦 Routing auth_redirect, route_user_after_login, require_tenant_access Authorization & routing
🌐 Handlers handle_login_request, handle_oauth_callback, handle_logout Route implementations
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ OAuth Authentication Flow │ ├─────────────────────────────────────────────────────────────────┤ │ 1. User clicks "Login with Google" │ │ 2. Generate CSRF state token → store in session │ │ 3. Redirect to Google OAuth │ │ 4. Google authenticates → redirects with code + state │ │ 5. Verify CSRF state matches session │ │ 6. Exchange code for user info │ │ 7. Create/get GlobalUser in host DB │ │ 8. Check membership OR auto-provision new tenant │ │ 9. Create session → redirect to dashboard │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Tenant Model │ ├─────────────────────────────────────────────────────────────────┤ │ Many Users → One Tenant (many-to-one) │ │ Each user belongs to exactly ONE tenant │ │ Each tenant can have MANY users │ │ New users auto-create their own tenant │ └─────────────────────────────────────────────────────────────────┘ ------------------------------------------------------------------------ ## 📚 Quick Reference ### Security Features
Feature Protection
CSRF State Token Prevents session hijacking attacks
Membership Validation Ensures cross-tenant isolation
Audit Logging Tracks all authentication events
### Token Expiry
Token Type Expiry Behavior
Google OAuth 1 hour User must re-login (refresh tokens: future)
Session (sliding) 1 hour inactivity Refreshes on each request via SlidingSessionMiddleware
> 💡 **Sliding Sessions**: Use > [`SlidingSessionMiddleware`](https://abhisheksreesaila.github.io/fh-saas/utils_auth.html#slidingsessionmiddleware) > or > [`create_session_middleware()`](https://abhisheksreesaila.github.io/fh-saas/utils_auth.html#create_session_middleware) > to keep users logged in while active. Sessions only expire after > configured inactivity period. ``` python from nbdev.showdoc import show_doc ``` ------------------------------------------------------------------------ ## 🎭 Role-Based Access Control Control access based on user roles within the tenant. ### Role Hierarchy
Role Level Automatic For
admin 3 Tenant owners
editor 2
viewer 1
### Key Functions
Function Purpose
has_min_role Check if user meets minimum role requirement
require_role Route decorator for role-based protection
get_user_role Derive effective role from session + tenant DB
------------------------------------------------------------------------ source ### has_min_role ``` python def has_min_role( user:dict, required_role:str )->bool: ``` *Check if user meets the minimum role requirement.* Args: user: User dict with ‘role’ field (from request.state.user) required_role: Minimum role needed (‘admin’, ‘editor’, ‘viewer’) Returns: True if user’s role \>= required_role in hierarchy Example: \>\>\> user = {‘role’: ‘editor’} \>\>\> has_min_role(user, ‘viewer’) \# True - editor \> viewer \>\>\> has_min_role(user, ‘admin’) \# False - editor \< admin ------------------------------------------------------------------------ source ### get_user_role ``` python def get_user_role( session:dict, tenant_db:Database=None )->str: ``` *Derive effective role from session and tenant database.* Rules: 1. Tenant owner (session\[‘tenant_role’\] == ‘owner’) → ‘admin’ 2. System admin → ‘admin’ 3. Otherwise → lookup TenantUser.local_role from tenant DB 4. Fallback → None (user must be explicitly assigned a role) Args: session: User session dict tenant_db: Tenant database connection (optional) Returns: Effective role string: ‘admin’, ‘editor’, ‘viewer’, or None ------------------------------------------------------------------------ source ### require_role ``` python def require_role( min_role:str ): ``` *Decorator to protect routes with minimum role requirement.* Args: min_role: Minimum role required (‘admin’, ‘editor’, ‘viewer’) Returns: Decorator that checks request.state.user\[‘role’\] Returns 403 Forbidden if user lacks required role Example: \>\>\> @app.get(‘/admin/settings’) \>\>\> @require_role(‘admin’) \>\>\> def admin_settings(request): … return “Admin only content” >>> @app.get('/reports') >>> @require_role('viewer') # All authenticated users >>> def view_reports(request): ... return "Reports" ------------------------------------------------------------------------ ## ⚡ Session Caching For HTMX-heavy apps with many partial requests, the beforeware can cache auth data in the session to avoid database queries on every request. ### Configuration
Parameter Default Description
session_cache False Enable caching user dict in session
session_cache_ttl 300 Cache TTL in seconds (5 minutes)
### How It Works Request arrives │ ▼ Check session cache │ ├── Cache valid? → Use cached user data (0 DB queries) │ └── Cache miss/expired? → Query DB → Update cache ### Cache Invalidation
Event Action
Logout Automatic (session cleared)
Role change Call invalidate_auth_cache(session)
TTL expiry Automatic refresh on next request
------------------------------------------------------------------------ source ### invalidate_auth_cache ``` python def invalidate_auth_cache( session:dict ): ``` *Clear the auth cache from session.* Call this when: - User role or permissions change - User is added/removed from tenant - Admin changes user’s local_role Args: session: User session dict Example: \>\>\> \# After admin changes user role \>\>\> tenant_user.local_role = ‘editor’ \>\>\> tables\[‘tenant_users’\].update(tenant_user) \>\>\> invalidate_auth_cache(session) \# Force fresh lookup ### Usage Example ``` python from fh_saas.utils_auth import invalidate_auth_cache # Enable caching (recommended for HTMX apps) app = FastHTML( before=create_auth_beforeware( session_cache=True, session_cache_ttl=300 # 5 minutes ) ) # After changing user role, invalidate their cache @app.post('/admin/users/{user_id}/role') def update_role(request, user_id: str, new_role: str): tables = request.state.tables user = tables['tenant_users'].get(user_id) user.local_role = new_role tables['tenant_users'].update(user) # If changing own role, invalidate cache if user_id == request.state.user['user_id']: invalidate_auth_cache(request.session) return "Role updated" ``` ### Usage in Routes ``` python from fh_saas.utils_auth import require_role, has_min_role # Option 1: Decorator for route-level protection @app.get('/admin/users') @require_role('admin') def manage_users(request): return "Admin-only user management" @app.get('/dashboard') @require_role('viewer') # All roles can access def dashboard(request): return "Dashboard for all users" # Option 2: Inline check for conditional logic @app.get('/data') def view_data(request): user = request.state.user if has_min_role(user, 'admin'): return "All data with admin controls" elif has_min_role(user, 'editor'): return "Data with edit buttons" else: return "Read-only data view" ``` ### Role Derivation Flow ┌─────────────────────────────────────────────────────────────────┐ │ Role Derivation (in beforeware) │ ├─────────────────────────────────────────────────────────────────┤ │ 1. Check session['tenant_role'] │ │ └── If 'owner' → effective role = 'admin' │ │ │ │ 2. Check session['is_sys_admin'] │ │ └── If True → effective role = 'admin' │ │ │ │ 3. Lookup TenantUser.local_role in tenant DB │ │ └── Returns assigned role or None │ │ │ │ 4. Attach to request.state.user['role'] │ │ └── Used by require_role() and has_min_role() │ └─────────────────────────────────────────────────────────────────┘ ------------------------------------------------------------------------ ## 🛡️ Auth Beforeware Protect routes by checking for authenticated sessions with auto tenant DB setup.
Function Purpose
create_auth_beforeware Factory to create route protection middleware
> 💡 **Use case**: Pass to FastHTML’s `before=` parameter for app-wide > authentication ------------------------------------------------------------------------ source ### create_auth_beforeware ``` python def create_auth_beforeware( redirect_path:str='/login', session_key:str='user_id', skip:list=None, include_defaults:bool=True, setup_tenant_db:bool=True, schema_init:Callable=None, session_cache:bool=False, session_cache_ttl:int=300, require_subscription:bool=False, subscription_redirect:str=None, grace_period_days:int=3, session_config:SessionConfig=None ): ``` *Create Beforeware that checks for authenticated session and sets up request.state.* Args: redirect_path: Where to redirect unauthenticated users session_key: Session key for user ID skip: List of regex patterns to skip auth include_defaults: Include default skip patterns setup_tenant_db: Auto-setup tenant database on request.state schema_init: Optional callback to initialize tables dict. Signature: (tenant_db: Database) -\> dict\[str, Table\] Result stored in request.state.tables session_cache: Enable caching user dict in session to reduce DB queries. Recommended for HTMX-heavy apps. Default: False session_cache_ttl: Cache TTL in seconds. Default: 300 (5 minutes) require_subscription: If True, check for active subscription and return 402 if not found. Default: False subscription_redirect: Optional URL to redirect if subscription required. If None, returns 402 Payment Required response. grace_period_days: Days to allow access after payment failure (default: 3) session_config: Optional SessionConfig for absolute timeout enforcement. Note: Sliding expiry requires SlidingSessionMiddleware. This parameter only enforces absolute_max if configured. Returns: Beforeware instance for FastHTML apps Sets on request.state: - user: dict with user_id, email, tenant_id, role, is_owner - tenant_id: str - tenant_db: Database connection - tables: dict of Table objects (if schema_init provided) - subscription: Subscription object (if require_subscription enabled) Example: \>\>\> \# Basic usage \>\>\> beforeware = create_auth_beforeware() >>> # With session caching for HTMX apps >>> beforeware = create_auth_beforeware( ... session_cache=True, ... session_cache_ttl=300 ... ) >>> # With schema initialization >>> def get_app_tables(db): ... return {'users': db.create(User, pk='id')} >>> beforeware = create_auth_beforeware(schema_init=get_app_tables) >>> # With subscription requirement >>> beforeware = create_auth_beforeware( ... require_subscription=True, ... subscription_redirect='/pricing' ... ) ------------------------------------------------------------------------ ## 🔑 OAuth Client
Function Purpose
get_google_oauth_client Initialize Google OAuth client from env vars
> ⚠️ **Required env vars**: `GOOGLE_CLIENT_ID`, `GOOGLE_CLIENT_SECRET` ------------------------------------------------------------------------ source ### get_google_oauth_client ``` python def get_google_oauth_client( ): ``` *Initialize Google OAuth client with credentials from environment.* ------------------------------------------------------------------------ ## 🔒 CSRF Protection Prevent session hijacking via state token validation.
Function Purpose
generate_oauth_state Create random UUID for CSRF protection
verify_oauth_state Validate callback state matches session
### Attack Without CSRF Protection 1. Attacker initiates OAuth → gets auth code 2. Attacker sends victim: yourapp.com/auth/callback?code=ATTACKER_CODE 3. Victim clicks → session created for attacker's account 4. Victim enters data → attacker sees it all > 🛡️ **Solution**: State token generated at login, verified at callback ------------------------------------------------------------------------ source ### verify_oauth_state ``` python def verify_oauth_state( session:dict, callback_state:str ): ``` *Verify OAuth callback state matches stored session state (CSRF protection).* ------------------------------------------------------------------------ source ### generate_oauth_state ``` python def generate_oauth_state( ): ``` *Generate cryptographically secure random state token for CSRF protection.* ------------------------------------------------------------------------ ## 👤 User Management
Function Purpose
create_or_get_global_user Create or retrieve user from host DB
get_user_membership Get user’s active tenant membership
verify_membership Validate user has access to tenant
------------------------------------------------------------------------ source ### verify_membership ``` python def verify_membership( host_db:HostDatabase, user_id:str, tenant_id:str )->bool: ``` *Verify user has active membership for specific tenant.* ------------------------------------------------------------------------ source ### get_user_membership ``` python def get_user_membership( host_db:HostDatabase, user_id:str ): ``` *Get single active membership for user.* ------------------------------------------------------------------------ source ### create_or_get_global_user ``` python def create_or_get_global_user( host_db:HostDatabase, oauth_id:str, email:str, oauth_info:dict=None ): ``` *Create or retrieve GlobalUser from host database.* ------------------------------------------------------------------------ ## 🏗️ Auto-Provisioning Create tenant infrastructure for first-time users.
Function Purpose
provision_new_user Create tenant DB, catalog entry, membership, and TenantUser
### Provisioning Steps 1. Create physical tenant database (PostgreSQL/SQLite) 2. Register tenant in host catalog 3. Create membership (user → tenant, role='owner') 4. Create TenantUser profile (local_role='admin') 5. Initialize core tenant schema 6. Log audit event > 💡 **Future**: Insert payment screen before step 1 ------------------------------------------------------------------------ source ### provision_new_user ``` python def provision_new_user( host_db:HostDatabase, global_user:GlobalUser )->str: ``` *Auto-provision new tenant for first-time user.* ------------------------------------------------------------------------ ## 📋 Session Management
Function Purpose
create_user_session Populate session after successful OAuth
get_current_user Extract user info from session
clear_session Clear all session data (logout)
------------------------------------------------------------------------ source ### clear_session ``` python def clear_session( session:dict ): ``` *Clear all session data (logout).* ------------------------------------------------------------------------ source ### get_current_user ``` python def get_current_user( session:dict )->dict | None: ``` *Extract current user info from session.* Returns: dict with keys: user_id, email, tenant_id, tenant_role, is_sys_admin Note: The ‘role’ and ‘is_owner’ fields are added by create_auth_beforeware after deriving the effective role from TenantUser.local_role. Access via request.state.user\[‘role’\] in routes. ------------------------------------------------------------------------ source ### create_user_session ``` python def create_user_session( session:dict, global_user:GlobalUser, membership:Membership ): ``` *Create authenticated session after successful OAuth login.* Sets session keys for user identity and tracking: - user_id, email, tenant_id, tenant_role, is_sys_admin: Identity - login_at: Timestamp when user logged in - session_started_at: Unix timestamp for absolute session timeout tracking ------------------------------------------------------------------------ ## 🚦 Route Helpers
Function Purpose
auth_redirect HTMX-aware redirect to login page
route_user_after_login Determine redirect URL based on user type
require_tenant_access Get tenant DB with membership validation
------------------------------------------------------------------------ source ### auth_redirect ``` python def auth_redirect( request, redirect_url:str='/login' ): ``` *HTMX-aware redirect for authentication flows.* When HTMX makes a partial request and receives a standard redirect (302/303), it follows the redirect and swaps the response into the target element. This causes the login page to appear inside the partial content area. This function detects HTMX requests and uses the `HX-Redirect` header to trigger a full page navigation instead. Args: request: Starlette request object redirect_url: URL to redirect to (default: ‘/login’) Returns: Response with appropriate redirect mechanism Example: `python @app.get('/dashboard') def dashboard(request): if not get_current_user(request.session): return auth_redirect(request) return render_dashboard()` ------------------------------------------------------------------------ source ### require_tenant_access ``` python def require_tenant_access( request_or_session ): ``` *Get tenant database with membership validation.* ------------------------------------------------------------------------ source ### route_user_after_login ``` python def route_user_after_login( global_user:GlobalUser, membership:Membership=None )->str: ``` *Determine redirect URL based on user type and membership.* ## 🌐 OAuth Route Handlers Complete OAuth 2.0 flow handlers:
Function Purpose
handle_login_request Initiate OAuth with CSRF protection
handle_oauth_callback Process provider response
handle_logout Clear session and redirect
------------------------------------------------------------------------ source ### handle_logout ``` python def handle_logout( session ): ``` *Clear session and redirect to login page.* ------------------------------------------------------------------------ source ### handle_oauth_callback ``` python def handle_oauth_callback( code:str, state:str, request, session ): ``` *Complete OAuth flow: CSRF verify → user info → provision → session → redirect.* ------------------------------------------------------------------------ source ### handle_login_request ``` python def handle_login_request( request, session ): ``` *Generate Google OAuth URL with CSRF state protection.* ======================================== FILE: _proc/06_utils_log.html.md # 📊 Logging ## 🎯 Overview
Function Purpose
configure_logging One-time setup at app startup
------------------------------------------------------------------------ ## 📋 Environment Variables
Variable Default Description
FH_SAAS_LOG_LEVEL WARNING DEBUG, INFO, WARNING, ERROR
FH_SAAS_LOG_FILE (none) Path to log file (optional)
## ⚙️ configure_logging ``` python from nbdev.showdoc import show_doc ``` ------------------------------------------------------------------------ source ### configure_logging ``` python def configure_logging( log_file:str=None, level:str=None, max_bytes:int=10000000, backup_count:int=5 ): ``` *Configure logging for all fh_saas modules - call once at app startup.* ## 📖 Usage Examples ### 1. App Startup (call once in main.py) ``` python from fh_saas.utils_log import configure_logging # Option A: Use environment variables (recommended for production) # Set FH_SAAS_LOG_LEVEL=INFO and FH_SAAS_LOG_FILE=./logs/app.log in .env configure_logging() # Option B: Explicit - console only configure_logging(level='INFO') # Option C: Console + rotating file configure_logging(level='INFO', log_file='./logs/app.log') ``` ### 2. In Any fh_saas Module (already done) Every module in fh_saas declares a logger at the top: ``` python # fh_saas/utils_oauth.py import logging logger = logging.getLogger(__name__) def handle_login_request(request, session): logger.debug('Login request initiated') # ... do work ... logger.info(f'OAuth complete, redirecting to {redirect_url}') ``` ### 3. Where Do Logs Go?
Configuration Console File
configure_logging()
configure_logging(log_file='app.log')
No configure_logging() call ❌ silent
### 4. Log Levels
Level When to Use Example
DEBUG Detailed diagnostic logger.debug('Checking user session')
INFO Normal operations logger.info('User logged in')
WARNING Unexpected but not failing logger.warning('Token expiring soon')
ERROR Operation failed logger.error('Auth failed', exc_info=True)
### 5. Real Example from fh_saas ``` python # In your FastHTML app from fasthtml.common import * from fh_saas.utils_log import configure_logging from fh_saas.utils_oauth import handle_login_request, handle_oauth_callback # Configure logging ONCE at startup configure_logging(level='INFO', log_file='./logs/app.log') app = FastHTML() @app.get('/login') def login(request, session): return handle_login_request(request, session) # Logs automatically @app.get('/auth/callback') def callback(code: str, state: str, request, session): return handle_oauth_callback(code, state, request, session) # Logs automatically ``` Output in console and file: 2026-01-10 14:30:00 | fh_saas.utils_oauth | DEBUG | Login request initiated 2026-01-10 14:30:05 | fh_saas.utils_oauth | DEBUG | OAuth callback received 2026-01-10 14:30:05 | fh_saas.utils_oauth | INFO | OAuth complete, redirecting to /dashboard ======================================== FILE: _proc/07_utils_api.html.md # 🌐 HTTP Client ## 🎯 Overview
Category Functions Purpose
🔄 Client AsyncAPIClient Async HTTP with retry
🔑 Auth bearer_token_auth, api_key_auth, oauth_token_auth Auth header generators
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ Retry Logic Flow │ ├─────────────────────────────────────────────────────────────────┤ │ Request → Response │ │ ↓ │ │ Status 429 or 500+ → Retry (exponential: 2s, 4s, 8s) │ │ ↓ │ │ Status 400-499 (except 429) → Fail immediately │ │ ↓ │ │ Network error → Retry │ │ ↓ │ │ Max 3 attempts → Raise exception │ └─────────────────────────────────────────────────────────────────┘ ## 🔄 AsyncAPIClient
Method Purpose
AsyncAPIClient Async HTTP client with retry
.request() Execute HTTP request with retry
.get_json() GET request returning JSON
------------------------------------------------------------------------ source ### AsyncAPIClient ``` python def AsyncAPIClient( base_url:str, auth_headers:dict=None, timeout:int=30 ): ``` *Async HTTP client with retry logic for external API integrations.* ------------------------------------------------------------------------ source ### AsyncAPIClient.\_\_init\_\_ ``` python def __init__( base_url:str, auth_headers:dict=None, timeout:int=30 ): ``` *Initialize client with base URL, optional auth headers, and timeout.* ------------------------------------------------------------------------ source ### AsyncAPIClient.request ``` python def request( method:str, endpoint:str, params:dict=None, json:dict=None, headers:dict=None )->httpx.Response: ``` *Execute HTTP request with automatic retry on 429/500+ errors.* ------------------------------------------------------------------------ source ### AsyncAPIClient.get_json ``` python def get_json( endpoint:str, params:dict=None )->Dict[str, Any]: ``` *Convenience GET that returns parsed JSON.* ## 🔑 Auth Helpers
Function Purpose
bearer_token_auth Bearer token header
api_key_auth API key header (customizable)
oauth_token_auth OAuth 2.0 access token
------------------------------------------------------------------------ source ### bearer_token_auth ``` python def bearer_token_auth( token:str )->dict: ``` *Generate Bearer token authentication header.* ------------------------------------------------------------------------ source ### api_key_auth ``` python def api_key_auth( api_key:str, header_name:str='X-API-Key' )->dict: ``` *Generate API key header with customizable header name.* ------------------------------------------------------------------------ source ### oauth_token_auth ``` python def oauth_token_auth( access_token:str )->dict: ``` *Generate OAuth 2.0 access token header (alias for bearer_token_auth).* ======================================== FILE: _proc/08_utils_graphql.html.md # 🔮 GraphQL Client ## 🎯 Overview
Method Purpose
GraphQLClient GraphQL client with async generator pagination
.from_url() NEW - Create client from URL with optional bearer token
.execute() NEW - Unified method for queries and mutations
.execute_query() Execute single query (returns full response)
.execute_mutation() Execute mutation (alias for execute_query)
.fetch_pages_relay() NEW - Fetch all pages from Relay-style pagination
.fetch_pages_generator() Stream paginated data (memory-efficient)
execute_graphql() NEW - One-liner function for simple queries
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ Streaming Pagination Flow │ ├─────────────────────────────────────────────────────────────────┤ │ Page 1: query($cursor: null) → yield [batch] → process │ │ Page 2: query($cursor: xyz) → yield [batch] → process │ │ Page N: query($cursor: abc) → yield [batch] → process │ │ │ │ ⚠️ CRITICAL: Never accumulates full dataset in memory │ │ ✅ Memory: O(page_size) not O(total_records) │ └─────────────────────────────────────────────────────────────────┘ ## 🔮 GraphQLClient
Method Purpose
.from_url() NEW - Create client from URL + token
.execute() NEW - Unified query/mutation (returns data only)
.execute_query() Execute single GraphQL query (full response)
.execute_mutation() Execute GraphQL mutation
.fetch_pages_relay() NEW - Auto-accumulate Relay pagination
.fetch_pages_generator() Async generator for paginated data (streaming)
------------------------------------------------------------------------ ### 🚀 Quick Start Examples #### One-Liner Query ``` python from fh_saas.utils_graphql import execute_graphql # NEW: No client management needed result = await execute_graphql( url="https://api.example.com/graphql", query="query { users { id name } }", bearer_token="your-token" ) ``` #### Convenience Constructor ``` python from fh_saas.utils_graphql import GraphQLClient # NEW: Direct from URL async with GraphQLClient.from_url( url="https://api.example.com/graphql", bearer_token="your-token" ) as gql: result = await gql.execute("query { users { id name } }") ``` #### Relay Pagination (Auto-Accumulate) ``` python # NEW: Fetch all pages automatically async with GraphQLClient.from_url(url, bearer_token=token) as gql: all_items = await gql.fetch_pages_relay( query=''' query($first: Int, $after: String, $filter: Filter) { transactionsConnection(first: $first, after: $after, filter: $filter) { edges { node { id amount } } pageInfo { hasNextPage endCursor } } } ''', connection_path="transactionsConnection", variables={"filter": {"status": "completed"}}, page_size=100, max_pages=50 # Safety limit ) print(f"Fetched {len(all_items)} total items") ``` #### Legacy: Manual Client Management ``` python from fh_saas.utils_api import AsyncAPIClient, bearer_token_auth from fh_saas.utils_graphql import GraphQLClient # Still supported for advanced use cases async with AsyncAPIClient( 'https://api.example.com/graphql', auth_headers=bearer_token_auth('TOKEN') ) as http_client: client = GraphQLClient(http_client) result = await client.execute_query( query='{ users { id name } }' ) ``` #### Memory-Efficient Streaming (Generator) ``` python # For very large datasets - process batches without accumulating async with GraphQLClient.from_url(url, bearer_token=token) as gql: async for batch in gql.fetch_pages_generator( query_template=''' query($cursor: String) { users(after: $cursor, first: 1000) { nodes { id name email } pageInfo { hasNextPage endCursor } } } ''', variables={'cursor': None}, items_path=['data', 'users', 'nodes'], cursor_path=['data', 'users', 'pageInfo', 'endCursor'], has_next_path=['data', 'users', 'pageInfo', 'hasNextPage'] ): print(f"Processing batch of {len(batch)} records") # Process immediately - never load full dataset ``` ------------------------------------------------------------------------ source ### GraphQLClient ``` python def GraphQLClient( api_client:AsyncAPIClient, # Initialized AsyncAPIClient instance endpoint:str='', # GraphQL endpoint path (default: '' for base URL) ): ``` *GraphQL client with streaming pagination for memory-efficient data fetching.* ------------------------------------------------------------------------ source ### GraphQLClient.\_\_init\_\_ ``` python def __init__( api_client:AsyncAPIClient, # Initialized AsyncAPIClient instance endpoint:str='', # GraphQL endpoint path (default: '' for base URL) ): ``` *Initialize self. See help(type(self)) for accurate signature.* ------------------------------------------------------------------------ source ### GraphQLClient.execute_query ``` python def execute_query( query:str, # GraphQL query string variables:dict=None, # Query variables )->Dict[str, Any]: ``` *Execute a single GraphQL query and return the JSON response.* ------------------------------------------------------------------------ source ### GraphQLClient.execute_mutation ``` python def execute_mutation( mutation:str, # GraphQL mutation string variables:dict=None, # Mutation variables )->Dict[str, Any]: ``` *Execute a GraphQL mutation (alias for execute_query).* ------------------------------------------------------------------------ source ### GraphQLClient.fetch_pages_generator ``` python def fetch_pages_generator( query_template:str, # GraphQL query with $variables placeholders variables:dict, # Initial variables (must include cursor key) items_path:list[str], # JSONPath to list of items (e.g., ['data', 'users', 'nodes']) cursor_path:list[str], # JSONPath to next cursor (e.g., ['data', 'users', 'pageInfo', 'endCursor']) has_next_path:list[str]=None, # Optional path to hasNextPage boolean (if None, checks cursor != None) cursor_var:str='cursor', # Variable name for cursor in query (default: 'cursor') )->AsyncGenerator[List[Dict], None]: ``` *Stream paginated GraphQL data page-by-page using async generator.* ------------------------------------------------------------------------ source ### GraphQLClient.fetch_pages_relay ``` python def fetch_pages_relay( query:str, # GraphQL query with $first and $after variables connection_path:str, # Dot-notation path to connection object (e.g., "transactionsConnection") variables:dict | None=None, # Base variables for the query (excluding first/after) page_size:int=100, # Number of items per page max_pages:int | None=None, # Maximum pages to fetch (None for unlimited) )->list[dict]: ``` *Fetch all pages from a Relay-style paginated GraphQL query.* Example query structure: query($first: Int, $after: String, $filter: TransactionFilter) { transactionsConnection(first: $first, after: $after, filter: $filter) { edges { node { id amount } } pageInfo { hasNextPage endCursor } } } Returns: List of all nodes from all pages ------------------------------------------------------------------------ source ### GraphQLClient.execute ``` python def execute( query:str, # GraphQL query or mutation string variables:dict=None, # Query/mutation variables )->Dict[str, Any]: ``` *Execute a GraphQL query or mutation and return the data portion.* This is a unified method that works for both queries and mutations. Returns only the ‘data’ portion of the response for convenience. ------------------------------------------------------------------------ source ### GraphQLClient.from_url ``` python def from_url( cls, url:str, # GraphQL endpoint URL bearer_token:str | None=None, # Optional bearer token for Authorization header headers:dict | None=None, # Optional additional headers ): ``` *Create GraphQL client directly from URL with optional bearer token.* Usage: async with GraphQLClient.from_url(url, bearer_token=token) as gql: result = await gql.execute(query) ### Streaming Pagination Details **Why use generators?** For large datasets (millions of rows), accumulating all data in memory causes OOM errors. The async generator yields batches instead. **Parameters:**
Parameter Description
query_template GraphQL query with $cursor variable
variables Initial variables dict (e.g., {'cursor': None})
items_path Path to data list in response (e.g., ['data', 'users', 'nodes'])
cursor_path Path to next cursor (e.g., ['data', 'users', 'pageInfo', 'endCursor'])
has_next_path Optional path to hasNextPage flag
cursor_var Cursor variable name in query (default: 'cursor')
**Example with database insert:** ``` python async for batch in client.fetch_pages_generator( query_template=''' query($cursor: String) { users(after: $cursor, first: 1000) { nodes { id name } pageInfo { hasNextPage endCursor } } } ''', variables={'cursor': None}, items_path=['data', 'users', 'nodes'], cursor_path=['data', 'users', 'pageInfo', 'endCursor'], has_next_path=['data', 'users', 'pageInfo', 'hasNextPage'] ): # Process each batch immediately df = pl.DataFrame(batch) df.write_database('staging_table', connection=conn) ``` ------------------------------------------------------------------------ source ### execute_graphql ``` python def execute_graphql( url:str, # GraphQL endpoint URL query:str, # GraphQL query string variables:dict | None=None, # Optional query variables bearer_token:str | None=None, # Optional bearer token for Authorization header headers:dict | None=None, # Optional additional headers )->dict: ``` *Execute a single GraphQL query without managing client lifecycle.* This is a convenience function for one-off queries. For multiple queries, use GraphQLClient.from_url() to reuse the connection. Usage: result = await execute_graphql( url=“https://api.example.com/graphql”, query=“query { users { id name } }”, bearer_token=“your-token” ) Raises: ValueError: If the response contains GraphQL errors ======================================== FILE: _proc/14_utils_webhook.html.md # 🪝 Webhooks ## 🎯 Overview
Function Purpose
verify_webhook_signature HMAC-SHA256 signature verification
check_idempotency Prevent duplicate processing
log_webhook_event Persist event to database
process_webhook Main orchestration function
handle_webhook_request FastHTML route handler
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ Webhook Processing Flow │ ├─────────────────────────────────────────────────────────────────┤ │ Request → Verify Signature → Check Idempotency → Log Event │ │ ↓ ✅ ✅ (not duplicate) │ │ Execute Handler → Update Status → Return Response │ └─────────────────────────────────────────────────────────────────┘ ## 🔐 Signature Verification
Function Purpose
verify_webhook_signature HMAC-SHA256 with timing-safe comparison
``` python from nbdev.showdoc import * ``` ------------------------------------------------------------------------ source ### verify_webhook_signature ``` python def verify_webhook_signature( payload:str, # Raw request body as string signature:str, # Signature from header (format: "sha256=") secret:Optional=None, # Secret key, defaults to WEBHOOK_SECRET env var )->bool: ``` *Verify HMAC-SHA256 signature for webhook payload. Returns True if valid.* ## 🔄 Idempotency
Function Purpose
check_idempotency Check if event already processed
------------------------------------------------------------------------ source ### check_idempotency ``` python def check_idempotency( db:Database, # Tenant database connection idempotency_key:str, # Unique key for this webhook event )->bool: ``` *Check if webhook event already processed. Returns True if duplicate.* ## 📝 Event Logging
Function Purpose
log_webhook_event Persist event to database
update_webhook_status Update status after processing
------------------------------------------------------------------------ source ### log_webhook_event ``` python def log_webhook_event( db:Database, # Tenant database connection webhook_id:str, # Unique webhook ID source:str, # Source system (e.g., "stripe", "github") event_type:str, # Event type (e.g., "payment.success") payload:Dict, # Full webhook payload signature:str, # Request signature idempotency_key:str, # Idempotency key status:str='pending', # Status: pending, processing, completed, failed ): ``` *Log webhook event to database* ## Update Webhook Status ------------------------------------------------------------------------ source ### update_webhook_status ``` python def update_webhook_status( db:Database, # Tenant database connection webhook_id:str, # Webhook ID to update status:str, # New status error_message:Optional=None, # Optional error message ): ``` *Update webhook event status and processed timestamp* ## ⚡ Process Webhook
Function Purpose
process_webhook Main orchestration function
handle_webhook_request FastHTML route handler
------------------------------------------------------------------------ source ### process_webhook ``` python def process_webhook( db:Database, # Tenant database connection webhook_id:str, # Unique webhook ID source:str, # Source system event_type:str, # Event type payload:Dict, # Webhook payload signature:str, # Request signature idempotency_key:str, # Idempotency key raw_body:str, # Raw request body for signature verification handler:Callable, # App-specific webhook handler function secret:Optional=None, # Optional webhook secret )->Dict: ``` *Process webhook with verification, idempotency, and custom handler execution* ``` python from nbdev.showdoc import show_doc ``` ------------------------------------------------------------------------ source ### verify_webhook_signature ``` python def verify_webhook_signature( payload:str, # Raw request body as string signature:str, # Signature from header (format: "sha256=") secret:Optional=None, # Secret key, defaults to WEBHOOK_SECRET env var )->bool: ``` *Verify HMAC-SHA256 signature for webhook payload. Returns True if valid.* ------------------------------------------------------------------------ source ### check_idempotency ``` python def check_idempotency( db:Database, # Tenant database connection idempotency_key:str, # Unique key for this webhook event )->bool: ``` *Check if webhook event already processed. Returns True if duplicate.* ------------------------------------------------------------------------ source ### log_webhook_event ``` python def log_webhook_event( db:Database, # Tenant database connection webhook_id:str, # Unique webhook ID source:str, # Source system (e.g., "stripe", "github") event_type:str, # Event type (e.g., "payment.success") payload:Dict, # Full webhook payload signature:str, # Request signature idempotency_key:str, # Idempotency key status:str='pending', # Status: pending, processing, completed, failed ): ``` *Log webhook event to database* ------------------------------------------------------------------------ source ### update_webhook_status ``` python def update_webhook_status( db:Database, # Tenant database connection webhook_id:str, # Webhook ID to update status:str, # New status error_message:Optional=None, # Optional error message ): ``` *Update webhook event status and processed timestamp* ------------------------------------------------------------------------ source ### process_webhook ``` python def process_webhook( db:Database, # Tenant database connection webhook_id:str, # Unique webhook ID source:str, # Source system event_type:str, # Event type payload:Dict, # Webhook payload signature:str, # Request signature idempotency_key:str, # Idempotency key raw_body:str, # Raw request body for signature verification handler:Callable, # App-specific webhook handler function secret:Optional=None, # Optional webhook secret )->Dict: ``` *Process webhook with verification, idempotency, and custom handler execution* ------------------------------------------------------------------------ source ### handle_webhook_request ``` python def handle_webhook_request( request, # FastHTML request object db:Database, # Tenant database instance source:str, # Webhook source identifier handler:Callable, # App-specific handler function signature_header:str='X-Webhook-Signature', # Header containing signature idempotency_header:str='X-Idempotency-Key', # Header containing idempotency key event_type_field:str='type', # Field in payload containing event type secret:Optional=None, # Optional webhook secret )->tuple: # Returns (response_dict, status_code) ``` *FastHTML route handler for webhook requests.* Example: @app.post(‘/webhooks/stripe’) async def stripe_webhook(request): return await handle_webhook_request( request=request, db=get_tenant_db(request), source=‘stripe’, handler=handle_stripe_event, signature_header=‘X-Stripe-Signature’, run_in_background=True ) ======================================== FILE: _proc/02_utils_sql.html.md # 🗃️ SQL Helpers ## 🎯 Overview This module provides a **batteries-included SQL toolkit** for building database-heavy applications:
Category Functions Purpose
🔍 Query Registry run_id, validate_params Execute queries by ID from centralized registries
➕ Insert-Only insert_only, bulk_insert_only Insert new records, skip conflicts
🔄 Upsert upsert, bulk_upsert Insert or update existing records
📝 CRUD get_by_id, update_record, delete_record, bulk_delete Standard database operations
🔧 Utilities with_transaction, paginate_sql, batch_execute Transaction management & helpers
💰 Money to_cents, from_cents Currency conversion for int-only storage
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ SQL Utilities Layer │ ├─────────────────────────────────────────────────────────────────┤ │ 🔍 Query Registry │ Execute queries by ID │ │ ➕ Insert Operations │ insert_only, bulk_insert_only │ │ 🔄 Upsert Operations │ upsert, bulk_upsert │ │ 📝 CRUD Operations │ get, update, delete (single/bulk) │ │ 🔧 Utilities │ transactions, pagination, batching │ │ 💰 Money Helpers │ cents ↔ dollars conversion │ ├─────────────────────────────────────────────────────────────────┤ │ DB_TYPE Detection: PostgreSQL / SQLite (auto-switch syntax) │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ fastsql Database Connection │ └─────────────────────────────────────────────────────────────────┘ ------------------------------------------------------------------------ ## 📚 Quick Reference ### Conflict Handling
Operation On Conflict Use Case
insert_only Skip (DO NOTHING) Append-only logs, idempotent imports
upsert Update existing Sync external data, refresh caches
### Performance Tips
Scenario Recommended Why
Single record insert_only, upsert Simple, immediate
10+ records bulk_* variants 10-100x faster
100+ records batch_execute Commits per batch, memory-safe
Multi-table changes with_transaction All-or-nothing commits
------------------------------------------------------------------------ ## 🔍 Database Type Detection Automatically detects PostgreSQL vs SQLite from `DB_TYPE` environment variable to generate appropriate SQL syntax. ------------------------------------------------------------------------ source ### get_db_type ``` python def get_db_type( ): ``` *Get database type from environment variable* ------------------------------------------------------------------------ ## 🎯 Query Registry Execute queries by ID from a centralized registry with automatic parameter validation.
Function Purpose
run_id Execute a registered query by its ID
validate_params Ensure all :param placeholders have values
------------------------------------------------------------------------ source ### run_id ``` python def run_id( db:Database, registry:Dict, query_id:str, params:Optional=None )->Any: ``` *Execute a query by ID from a query registry.* ------------------------------------------------------------------------ source ### validate_params ``` python def validate_params( sql:str, params:Dict )->None: ``` *Validate that all required parameters are provided* ------------------------------------------------------------------------ ## ➕ Insert-Only Insert new records **only if they don’t exist** — conflicts are silently skipped.
Function Records SQL Generated
insert_only Single ON CONFLICT DO NOTHING (PG) / INSERT OR IGNORE (SQLite)
bulk_insert_only Multiple Same, with batch execution
> 💡 **Use case**: Idempotent imports, append-only audit logs, webhook > deduplication ------------------------------------------------------------------------ source ### bulk_insert_only ``` python def bulk_insert_only( db:Database, table_name:str, records:List, conflict_cols:List, auto_commit:bool=True )->None: ``` *Insert multiple records, skipping conflicts (optimized batch operation).* ------------------------------------------------------------------------ source ### insert_only ``` python def insert_only( db:Database, table_name:str, record:Dict, conflict_cols:List, auto_commit:bool=True )->None: ``` *Insert a single record only if it doesn’t exist (ignores conflicts).* ------------------------------------------------------------------------ ## 🔄 Upsert Insert new records **or update existing ones** on conflict.
Function Records SQL Generated
upsert Single ON CONFLICT DO UPDATE (PG) / INSERT OR REPLACE (SQLite)
bulk_upsert Multiple Same, with batch execution
> 💡 **Use case**: Syncing external API data, refreshing caches, user > settings ------------------------------------------------------------------------ source ### bulk_upsert ``` python def bulk_upsert( db:Database, table_name:str, records:List, conflict_cols:List, update_cols:Optional=None, auto_commit:bool=True )->None: ``` *Insert or update multiple records (optimized batch operation).* ------------------------------------------------------------------------ source ### upsert ``` python def upsert( db:Database, table_name:str, record:Dict, conflict_cols:List, update_cols:Optional=None, auto_commit:bool=True )->None: ``` *Insert a record or update if it exists (upsert).* ------------------------------------------------------------------------ ## 📝 CRUD Standard Create, Read, Update, Delete operations.
Function Operation Description
get_by_id Read Fetch single record by primary key
update_record Update Modify record fields by ID
delete_record Delete Remove single record by ID
bulk_delete Delete Remove multiple records by ID list
------------------------------------------------------------------------ source ### bulk_delete ``` python def bulk_delete( db:Database, table_name:str, id_list:List, id_col:str='id', auto_commit:bool=True )->None: ``` *Delete multiple records by ID list.* ------------------------------------------------------------------------ source ### delete_record ``` python def delete_record( db:Database, table_name:str, id_value:Any, id_col:str='id', auto_commit:bool=True )->None: ``` *Delete a single record by ID.* ------------------------------------------------------------------------ source ### update_record ``` python def update_record( db:Database, table_name:str, id_value:Any, id_col:str='id', auto_commit:bool=True, updates:VAR_KEYWORD )->None: ``` *Update a single record by ID.* ------------------------------------------------------------------------ source ### get_by_id ``` python def get_by_id( db:Database, table_name:str, id_value:Any, id_col:str='id' )->Any: ``` *Get a single record by ID.* ------------------------------------------------------------------------ ## 🔧 Utilities Transaction management, pagination, and batch processing.
Function Purpose
with_transaction Context manager for atomic operations
paginate_sql Add LIMIT/OFFSET to queries
batch_execute Process large lists in memory-safe chunks
------------------------------------------------------------------------ source ### batch_execute ``` python def batch_execute( db:Database, operation_func, items:List, batch_size:int=100 )->None: ``` *Execute an operation on items in batches with commits after each batch.* ------------------------------------------------------------------------ source ### paginate_sql ``` python def paginate_sql( sql:str, page:int, page_size:int )->str: ``` *Add LIMIT/OFFSET pagination to a SQL query.* ------------------------------------------------------------------------ source ### with_transaction ``` python def with_transaction( db:Database ): ``` *Context manager for safe transaction handling with auto-rollback on error.* ------------------------------------------------------------------------ ## 💰 Money Helpers Convert between dollar amounts and integer cents for database storage. > ⚠️ **Why cents?** fastsql only supports `int` and `str` types — > storing money as cents avoids floating-point precision issues.
Function Direction Example
to_cents "$150.00"15000 Store in DB
from_cents 15000"$150.00" Display to user
------------------------------------------------------------------------ source ### to_cents ``` python def to_cents( dollars:str | float | None )->int | None: ``` *Convert dollar amount to integer cents for database storage.* ------------------------------------------------------------------------ source ### from_cents ``` python def from_cents( cents:int | None )->str: ``` *Convert integer cents to formatted dollar string for display.* ======================================== FILE: _proc/03_utils_bgtsk.html.md # ⚡ Background Tasks ## 🎯 Overview
Category Components Purpose
📦 Model TenantJob Tenant-level job record with retry support
⚡ Manager BackgroundTaskManager Submit, execute, and track background tasks
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ FastHTML Route │ │ return response, bg_task ←─────────────────────┐ │ └─────────────────────────────────────────────────────────────────┘ │ │ ▼ │ ┌─────────────────────────────────────────────────────────────────┐ │ BackgroundTaskManager │ ├─────────────────────────────────────────────────────────────────┤ │ submit() → Create job + return BackgroundTask │ │ _execute_with_retry() → Run with auto-retry on failure │ │ get_job() → Check job status │ │ list_jobs() → Query jobs by type/status │ ├─────────────────────────────────────────────────────────────────┤ │ Retry Logic: 2^n seconds backoff (2s, 4s, 8s) │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Tenant Database │ │ └── tenant_jobs table │ └─────────────────────────────────────────────────────────────────┘ ------------------------------------------------------------------------ ## 📚 Quick Reference ### Job Status Flow pending → running → completed ↑ │ └────────┴──→ failed (after max retries) ### Usage Pattern ``` python # In your FastHTML route manager = BackgroundTaskManager(tenant_db) job_id, bg_task = manager.submit("sync_data", my_sync_func, user_id="123") return response, bg_task # Starlette runs bg_task after response ``` ------------------------------------------------------------------------ ## 📦 TenantJob Model Tracks background jobs at the tenant level with retry support.
Field Type Description
id str Unique job identifier
job_type str Job category (e.g., “sync”, “email”)
status str pending / running / completed / failed
payload str JSON kwargs passed to task function
result str JSON result on completion
error_log str Stack trace on failure
retry_count int Current retry attempt
max_retries int Max attempts before marking failed
------------------------------------------------------------------------ source ### TenantJob ``` python def TenantJob( args:VAR_POSITIONAL, kwargs:VAR_KEYWORD ): ``` *Tenant-level background job with retry support.* ------------------------------------------------------------------------ ## ⚡ BackgroundTaskManager Submit and track background tasks with automatic retry logic.
Method Purpose
submit Create job and return BackgroundTask for Starlette
get_job Get job status and details by ID
list_jobs Query jobs with optional type/status filters
> 💡 **Use case**: Long-running operations like syncing data, sending > emails, or processing uploads ------------------------------------------------------------------------ source ### BackgroundTaskManager ``` python def BackgroundTaskManager( db:Database ): ``` *Lightweight background task manager for tenant-level operations.* ------------------------------------------------------------------------ source ### BackgroundTaskManager.list_jobs ``` python def list_jobs( job_type:Optional=None, status:Optional=None, limit:int=100 )->list: ``` *List jobs with optional filtering.* ------------------------------------------------------------------------ source ### BackgroundTaskManager.get_job ``` python def get_job( job_id:str )->TenantJob: ``` *Get job status and details.* ------------------------------------------------------------------------ source ### BackgroundTaskManager.submit ``` python def submit( job_type:str, task_func:Callable, max_retries:int=3, task_kwargs:VAR_KEYWORD )->tuple: ``` *Submit a new background task for execution.* ======================================== FILE: _proc/09_utils_polars_mapper.html.md # 🐻‍❄️ Data Transforms ## 🎯 Overview
Function Purpose
map_and_upsert Bulk JSON→DB upsert via staging table
apply_schema Type conversions (dates, booleans, numbers)
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ Staging Table Pattern │ ├─────────────────────────────────────────────────────────────────┤ │ 1. df.write_database(staging_table) → fast bulk INSERT │ │ 2. INSERT ... ON CONFLICT SELECT FROM staging → vectorized │ │ 3. DROP TABLE staging → cleanup │ │ │ │ ⚡ 10-100x faster than row-by-row upserts │ └─────────────────────────────────────────────────────────────────┘ ## 📥 Map & Upsert
Parameter Description
df Polars DataFrame from JSON
table_name Target database table (must exist)
key_col Primary key for ON CONFLICT resolution
db_uri Database connection string
column_map Optional rename map {json_col: db_col}
unnest_cols Optional list of nested columns to flatten
type_map Optional type casting map {col: pl.DataType}
**Returns:** `int` - Number of rows affected by the upsert operation **Staging Table Pattern (10-100x faster than row-by-row):** 1. Write to temporary staging table (fast bulk insert) 2. Execute `INSERT ... ON CONFLICT` (database-native, vectorized) 3. Drop staging table (cleanup) **Type Casting with `type_map`:** When API data contains `None` values, Polars may infer incorrect types (e.g., `String` instead of `Int64`). This causes PostgreSQL type mismatch errors like: column "balance" is of type integer but expression is of type text Use `type_map` to explicitly cast columns before writing: ``` python rows = map_and_upsert( df=df, table_name='accounts', key_col='id', db_uri=db_uri, type_map={ 'balance_current': pl.Int64, 'balance_available': pl.Int64, 'balance_limit': pl.Int64 } ) print(f"Upserted {rows} rows") ``` **Basic Example:** ``` python import polars as pl # JSON from API json_data = [ {'user_id_val': 1, 'ABC_1': 'Alice', 'extra': 'ignore'}, {'user_id_val': 2, 'ABC_1': 'Bob', 'extra': 'ignore'} ] df = pl.DataFrame(json_data) rows_affected = map_and_upsert( df=df, table_name='users', key_col='user_id', db_uri='sqlite:///app.db', column_map={'user_id_val': 'user_id', 'ABC_1': 'name'} ) print(f"Upserted {rows_affected} users") ``` ------------------------------------------------------------------------ source ### map_and_upsert ``` python def map_and_upsert( df:pl.DataFrame, # The raw Polars DataFrame from JSON table_name:str, # Target database table name key_col:str, # Primary key column for conflict resolution db_uri:str, # SQLAlchemy connection string (e.g., 'sqlite:///db.db' or 'postgresql://...') column_map:dict=None, # Optional rename map {json_key: db_col} unnest_cols:list[str]=None, # List of Struct columns to flatten type_map:dict=None, # Optional type casting map {col_name: pl.DataType} )->int: ``` *Map JSON data to database columns and upsert using staging table pattern.* **Returns:** Number of rows affected by the upsert operation **Type Casting:** When columns have `None` values, Polars may infer incorrect types (e.g., String instead of Int64). This causes PostgreSQL type mismatch errors. Use `type_map` to explicitly cast columns before writing to the database. ## 🔧 Schema Transformations
Parameter Description
df Polars DataFrame
type_map Dict mapping column names to Polars dtypes
**Supported conversions:**
Type Handling
pl.Date Parses YYYY-MM-DD strings
pl.Datetime Parses datetime strings
pl.Boolean Converts "true"/"false" strings
Other Uses cast() (works for numeric types)
**Example:** ``` python df = pl.DataFrame({ 'created_at': ['2024-01-15', '2024-01-16'], 'is_active': ['true', 'false'], 'amount': ['123.45', '678.90'] }) df = apply_schema(df, { 'created_at': pl.Date, 'is_active': pl.Boolean, 'amount': pl.Float64 }) ``` ------------------------------------------------------------------------ source ### apply_schema ``` python def apply_schema( df:pl.DataFrame, # Input DataFrame type_map:dict, # Column name -> Polars dtype (e.g., {'created_at': pl.Date, 'is_active': pl.Boolean}) )->pl.DataFrame: ``` *Apply explicit type conversions to DataFrame columns.* ======================================== FILE: _proc/16_utils_stripe.html.md # 💳 Stripe Utilities ## 🎯 Overview
Category Functions Purpose
⚙️ Configuration StripeConfig, StripeConfig.from_env() Configure Stripe with env vars or explicit values
💳 Service StripeService Unified API for checkouts, subscriptions, webhooks
🛒 Checkout create_subscription_checkout, create_one_time_checkout Generate Stripe checkout sessions
🔄 Subscriptions get_subscription, cancel_subscription, change_plan Manage active subscriptions
🪝 Webhooks verify_signature, handle_event Process Stripe webhook events
🔐 Access Control get_active_subscription, has_active_subscription, require_active_subscription Gate features by payment status
📊 Feature Gating check_feature_access Control features by plan tier
🛤️ Route Helpers create_webhook_route, create_checkout_route, create_portal_route FastHTML route factories
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ Payment Flow │ ├─────────────────────────────────────────────────────────────────┤ │ 1. User clicks "Subscribe" or "Buy" │ │ 2. StripeService.create_*_checkout() → Stripe Session │ │ 3. User completes payment on Stripe │ │ 4. Stripe sends webhook → handle_event() │ │ 5. Subscription saved to core_subscriptions │ │ 6. Access control checks subscription status │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Access Control │ ├─────────────────────────────────────────────────────────────────┤ │ require_active_subscription(tenant_id) │ │ ├─ status = 'active' → ✅ Access granted │ │ ├─ status = 'trialing' → ✅ Access granted │ │ ├─ status = 'past_due' + within grace → ⚠️ Access granted │ │ └─ Otherwise → 🚫 402 Payment Required │ └─────────────────────────────────────────────────────────────────┘ ------------------------------------------------------------------------ ## 🌍 Environment Variables
Variable Required Description
STRIPE_SECRET_KEY Stripe secret API key (sk_live_* or sk_test_*)
STRIPE_WEBHOOK_SECRET Webhook endpoint signing secret (whsec_*)
STRIPE_MONTHLY_PRICE_ID ⚠️ Pre-created monthly price ID (price_*)
STRIPE_YEARLY_PRICE_ID ⚠️ Pre-created yearly price ID (price_*)
STRIPE_BASE_URL ⚠️ Application base URL for callbacks
> ⚠️ Price IDs are required for subscription checkouts. Create them in > Stripe Dashboard first. ------------------------------------------------------------------------ ``` python from nbdev.showdoc import show_doc ``` ## ⚙️ Configuration Configure Stripe integration with sensible defaults and environment variable support. ------------------------------------------------------------------------ source ### StripeConfig ``` python def StripeConfig( secret_key:str, webhook_secret:str=None, monthly_price_id:str=None, yearly_price_id:str=None, trial_days:int=30, grace_period_days:int=3, base_url:str='http://localhost:5001', success_path:str='/payment-success', cancel_path:str='/settings/payment', allow_promotions:bool=True, is_development:bool=False, feature_tiers:Dict= )->None: ``` *Configuration for Stripe integration.* Supports both subscription and one-time payment modes with configurable trial periods, grace periods, and callback URLs. Attributes: secret_key: Stripe secret API key webhook_secret: Webhook signing secret for signature verification monthly_price_id: Pre-created Stripe price ID for monthly subscriptions yearly_price_id: Pre-created Stripe price ID for yearly subscriptions trial_days: Number of days for free trial (default: 30) grace_period_days: Days to allow access after payment failure (default: 3) base_url: Application base URL for redirect callbacks success_path: Path for successful payment redirect cancel_path: Path for canceled payment redirect allow_promotions: Enable Stripe promotion codes in checkout is_development: Enable development mode (skip signature verification) Example: \>\>\> config = StripeConfig.from_env() \>\>\> service = StripeService(config) ------------------------------------------------------------------------ source ### StripeConfig.from_env ``` python def from_env( )->StripeConfig: ``` *Create StripeConfig from environment variables.* Reads STRIPE\_\* environment variables with sensible defaults. Returns: Configured StripeConfig instance Raises: ValueError: If STRIPE_SECRET_KEY is not set Example: \>\>\> \# Set environment variables first \>\>\> os.environ\[‘STRIPE_SECRET_KEY’\] = ‘sk_test\_…’ \>\>\> config = StripeConfig.from_env() ------------------------------------------------------------------------ ## 💳 Stripe Service Unified service class for all Stripe operations: checkouts, subscriptions, and webhooks. ------------------------------------------------------------------------ source ### StripeService ``` python def StripeService( config:StripeConfig, host_db:HostDatabase=None ): ``` *Unified Stripe service for payments, subscriptions, and webhooks.* Consolidates checkout creation, subscription management, and webhook handling into a single service with consistent patterns. Attributes: config: StripeConfig instance api: FastStripe API wrapper host_db: HostDatabase for subscription persistence Example: \>\>\> config = StripeConfig.from_env() \>\>\> service = StripeService(config) \>\>\> checkout = service.create_subscription_checkout( … plan_type=‘monthly’, … tenant_id=‘tnt_123’, … user_email=‘user@example.com’ … ) \>\>\> print(checkout.url) \# Redirect user here ------------------------------------------------------------------------ source ### StripeService.create_subscription_checkout ``` python def create_subscription_checkout( plan_type:str, tenant_id:str, user_email:str, metadata:Dict=None )->Any: ``` *Create a subscription checkout session with trial.* Args: plan_type: ‘monthly’ or ‘yearly’ tenant_id: Tenant ID to associate subscription with user_email: Customer email for Stripe metadata: Additional metadata to store with subscription Returns: Stripe Checkout Session with ‘url’ and ‘id’ attributes Raises: ValueError: If plan_type is invalid or price ID not configured Example: \>\>\> checkout = service.create_subscription_checkout( … plan_type=‘monthly’, … tenant_id=‘tnt_abc123’, … user_email=‘user@example.com’ … ) \>\>\> return RedirectResponse(checkout.url) ------------------------------------------------------------------------ source ### StripeService.create_one_time_checkout ``` python def create_one_time_checkout( amount_cents:int, product_name:str, tenant_id:str, user_email:str, currency:str='usd', metadata:Dict=None )->Any: ``` *Create a one-time payment checkout session.* Args: amount_cents: Payment amount in cents (e.g., 1999 for $19.99) product_name: Name displayed on checkout tenant_id: Tenant ID for record keeping user_email: Customer email for Stripe currency: ISO currency code (default: ‘usd’) metadata: Additional metadata to store Returns: Stripe Checkout Session with ‘url’ and ‘id’ attributes Example: \>\>\> checkout = service.create_one_time_checkout( … amount_cents=4999, … product_name=‘Premium Report’, … tenant_id=‘tnt_abc123’, … user_email=‘user@example.com’ … ) \>\>\> return RedirectResponse(checkout.url) ------------------------------------------------------------------------ source ### StripeService.create_customer_portal_session ``` python def create_customer_portal_session( customer_id:str, return_url:str=None )->Any: ``` *Create a Stripe Customer Portal session for self-service billing.* Args: customer_id: Stripe customer ID (cus\_\*) return_url: URL to redirect after portal session (default: base_url) Returns: Portal session with ‘url’ attribute Example: \>\>\> portal = service.create_customer_portal_session(‘cus_123’) \>\>\> return RedirectResponse(portal.url) ------------------------------------------------------------------------ source ### StripeService.cancel_subscription ``` python def cancel_subscription( subscription_id:str, at_period_end:bool=True )->Any: ``` *Cancel a subscription.* Args: subscription_id: Stripe subscription ID at_period_end: If True, cancel at end of billing period (default) If False, cancel immediately Returns: Updated Stripe Subscription object ------------------------------------------------------------------------ source ### StripeService.handle_event ``` python def handle_event( event:Dict )->Dict: ``` *Route webhook event to appropriate handler.* Handles both subscription and one-time payment events. Args: event: Parsed Stripe event dict Returns: Dict with ‘status’ (‘success’, ‘warning’, ‘error’, ‘ignored’) and ‘message’ describing the result ------------------------------------------------------------------------ ## 💰 Pricing Plans Database-backed pricing tiers for multi-tier subscription management. Store your Stripe price IDs in the database for admin-configurable pricing without code changes. ------------------------------------------------------------------------ source ### get_pricing_plans ``` python def get_pricing_plans( host_db:HostDatabase=None, active_only:bool=True )->List: ``` *Get all pricing plans from the database.* Retrieves pricing plan configurations stored in the `core_pricing_plans` table. Plans define available subscription tiers with their Stripe price IDs, amounts, features, and display settings. Args: host_db: Optional HostDatabase instance. Uses from_env() if not provided. active_only: If True (default), only returns plans where is_active=True. Set to False to include inactive/archived plans. Returns: List of PricingPlan objects sorted by sort_order, then by tier_level. Empty list if no plans are configured. Example: \>\>\> \# Get all active plans for pricing page \>\>\> plans = get_pricing_plans() \>\>\> for plan in plans: … print(f”{plan.name}: ${plan.amount_monthly/100}/mo”) Basic Plan: $7.99/mo Pro Plan: $19.99/mo Enterprise Plan: $49.99/mo >>> # Get specific plan for checkout >>> plans = get_pricing_plans() >>> pro_plan = next((p for p in plans if p.id == 'pro'), None) >>> if pro_plan: ... price_id = pro_plan.stripe_price_monthly >>> # Setting up plans (typically in admin or migration) >>> from fh_saas.db_host import HostDatabase, PricingPlan, gen_id, timestamp >>> host_db = HostDatabase.from_env() >>> >>> plans_data = [ ... { ... 'id': 'basic', ... 'name': 'Basic Plan', ... 'description': 'Essential features for individuals', ... 'stripe_price_monthly': 'price_basic_monthly_xxx', ... 'stripe_price_yearly': 'price_basic_yearly_xxx', ... 'amount_monthly': 799, # $7.99 ... 'amount_yearly': 7990, # $79.90 (save ~17%) ... 'tier_level': 1, ... 'features': '["basic_reports", "email_support"]', ... 'sort_order': 1, ... }, ... { ... 'id': 'pro', ... 'name': 'Pro Plan', ... 'description': 'Advanced features for teams', ... 'stripe_price_monthly': 'price_pro_monthly_xxx', ... 'stripe_price_yearly': 'price_pro_yearly_xxx', ... 'amount_monthly': 1999, # $19.99 ... 'amount_yearly': 19990, # $199.90 ... 'tier_level': 2, ... 'features': '["basic_reports", "advanced_analytics", "api_access", "priority_support"]', ... 'sort_order': 2, ... }, ... { ... 'id': 'enterprise', ... 'name': 'Enterprise Plan', ... 'description': 'Full platform access with custom solutions', ... 'stripe_price_monthly': 'price_ent_monthly_xxx', ... 'stripe_price_yearly': 'price_ent_yearly_xxx', ... 'amount_monthly': 4999, # $49.99 ... 'amount_yearly': 49990, # $499.90 ... 'tier_level': 3, ... 'features': '["all_features", "dedicated_support", "custom_integrations", "sla"]', ... 'sort_order': 3, ... }, ... ] >>> >>> for plan_data in plans_data: ... plan = PricingPlan(**plan_data, created_at=timestamp()) ... host_db.pricing_plans.insert(plan) >>> host_db.commit() Note: - Create your Stripe Products and Prices in the Stripe Dashboard first - Copy the price IDs (price_xxx) into your database records - The `tier_level` field is used by [`check_feature_access()`](https://abhisheksreesaila.github.io/fh-saas/utils_stripe.html#check_feature_access) for feature gating - The `features` field should be a JSON array of feature keys ------------------------------------------------------------------------ source ### get_pricing_plan ``` python def get_pricing_plan( plan_id:str, host_db:HostDatabase=None )->Optional: ``` *Get a specific pricing plan by ID.* Args: plan_id: The plan identifier (e.g., ‘basic’, ‘pro’, ‘enterprise’) host_db: Optional HostDatabase instance Returns: PricingPlan object if found, None otherwise Example: \>\>\> plan = get_pricing_plan(‘pro’) \>\>\> if plan: … checkout = service.create_subscription_checkout( … price_id=plan.stripe_price_monthly, … … … ) ------------------------------------------------------------------------ ## 🔐 Access Control Functions to gate features based on subscription status with grace period support. ------------------------------------------------------------------------ source ### get_active_subscription ``` python def get_active_subscription( tenant_id:str, host_db:HostDatabase=None, grace_period_days:int=3 )->Optional: ``` *Get active subscription for a tenant with grace period support.* Returns subscription if: - Status is ‘active’ or ‘trialing’ - Status is ‘past_due’ but within grace period from current_period_end Args: tenant_id: Tenant ID to check host_db: Optional HostDatabase, uses from_env() if not provided grace_period_days: Days to allow access after payment failure (default: 3) Returns: Active Subscription object, or None if no valid subscription Example: \>\>\> sub = get_active_subscription(‘tnt_123’) \>\>\> if sub: … print(f”Active until {sub.current_period_end}“) ------------------------------------------------------------------------ source ### has_active_subscription ``` python def has_active_subscription( tenant_id:str, host_db:HostDatabase=None, grace_period_days:int=3 )->bool: ``` *Check if tenant has an active subscription.* Args: tenant_id: Tenant ID to check host_db: Optional HostDatabase grace_period_days: Days to allow access after payment failure Returns: True if tenant has valid subscription, False otherwise Example: \>\>\> if has_active_subscription(‘tnt_123’): … show_premium_features() ------------------------------------------------------------------------ source ### require_active_subscription ``` python def require_active_subscription( tenant_id:str, host_db:HostDatabase=None, grace_period_days:int=3, redirect_url:str=None )->Optional: ``` *Require active subscription, returning 402 if not found.* Use in route handlers to gate premium features. Args: tenant_id: Tenant ID to check host_db: Optional HostDatabase grace_period_days: Days to allow access after payment failure redirect_url: Optional URL to redirect instead of 402 Returns: None if subscription is valid (allow access) Response(402) or RedirectResponse if subscription required Example: \>\>\> @app.get(‘/premium-feature’) \>\>\> def premium_feature(request): … error = require_active_subscription(request.state.tenant_id) … if error: … return error … return render_premium_content() ------------------------------------------------------------------------ source ### get_subscription_status ``` python def get_subscription_status( tenant_id:str, host_db:HostDatabase=None, grace_period_days:int=3 )->Dict: ``` *Get detailed subscription status for UI display.* Args: tenant_id: Tenant ID to check host_db: Optional HostDatabase grace_period_days: Days for grace period calculation Returns: Dict with subscription details for UI: - has_subscription: bool - status: str (‘active’, ‘trialing’, ‘past_due’, ‘canceled’, ‘none’) - plan_tier: str or None - is_trial: bool - trial_ends_at: str (ISO) or None - current_period_end: str (ISO) or None - days_remaining: int or None - in_grace_period: bool - cancel_at_period_end: bool Example: \>\>\> status = get_subscription_status(‘tnt_123’) \>\>\> if status\[‘is_trial’\]: … show_trial_banner(status\[‘days_remaining’\]) ------------------------------------------------------------------------ ## 📊 Feature Gating Control access to features based on subscription plan tier. ------------------------------------------------------------------------ source ### check_feature_access ``` python def check_feature_access( tenant_id:str, feature:str, feature_tiers:Dict=None, host_db:HostDatabase=None, grace_period_days:int=3 )->bool: ``` *Check if tenant’s plan tier allows access to a feature.* Args: tenant_id: Tenant ID to check feature: Feature name to check access for feature_tiers: Optional mapping of feature -\> required tier. Defaults to {‘advanced_analytics’: ‘yearly’, …} host_db: Optional HostDatabase grace_period_days: Days for grace period Returns: True if tenant’s plan allows the feature, False otherwise Example: \>\>\> if check_feature_access(‘tnt_123’, ‘advanced_analytics’): … return render_analytics() \>\>\> else: … return render_upgrade_prompt() ------------------------------------------------------------------------ source ### require_feature_access ``` python def require_feature_access( tenant_id:str, feature:str, feature_tiers:Dict=None, host_db:HostDatabase=None, redirect_url:str=None )->Optional: ``` *Require feature access, returning 403 if not allowed.* Args: tenant_id: Tenant ID to check feature: Feature name to check feature_tiers: Optional feature -\> tier mapping host_db: Optional HostDatabase redirect_url: Optional URL to redirect instead of 403 Returns: None if access allowed, Response(403) or redirect otherwise Example: \>\>\> @app.get(‘/analytics’) \>\>\> def analytics(request): … error = require_feature_access( … request.state.tenant_id, … ‘advanced_analytics’, … redirect_url=‘/upgrade’ … ) … if error: … return error … return render_analytics() ------------------------------------------------------------------------ ## 🛤️ Route Helpers Ready-to-use FastHTML route factories for Stripe integration. ------------------------------------------------------------------------ source ### create_webhook_route ``` python def create_webhook_route( app, service:StripeService, path:str='/stripe/webhook' ): ``` *Create a POST route handler for Stripe webhooks.* Args: app: FastHTML application instance service: Configured StripeService instance path: URL path for webhook endpoint Returns: The registered route handler Example: \>\>\> from fh_saas.utils_stripe import StripeService, StripeConfig, create_webhook_route \>\>\> config = StripeConfig.from_env() \>\>\> service = StripeService(config) \>\>\> create_webhook_route(app, service) \>\>\> \# Webhook now available at POST /stripe/webhook ------------------------------------------------------------------------ source ### create_subscription_checkout_route ``` python def create_subscription_checkout_route( app, service:StripeService, path:str='/checkout/{plan_type}' ): ``` *Create a GET route for subscription checkout redirect.* Expects authenticated user with tenant_id in request.state. Args: app: FastHTML application instance service: Configured StripeService instance path: URL path pattern with {plan_type} placeholder Returns: The registered route handler Example: \>\>\> create_subscription_checkout_route(app, service) \>\>\> \# Checkout available at GET /checkout/monthly or /checkout/yearly ------------------------------------------------------------------------ source ### create_one_time_checkout_route ``` python def create_one_time_checkout_route( app, service:StripeService, products:Dict, path:str='/buy/{product_id}' ): ``` *Create a GET route for one-time payment checkout.* Args: app: FastHTML application instance service: Configured StripeService instance products: Dict mapping product_id to {‘name’: str, ‘amount_cents’: int} path: URL path pattern with {product_id} placeholder Returns: The registered route handler Example: \>\>\> products = { … ‘report’: {‘name’: ‘Premium Report’, ‘amount_cents’: 4999}, … ‘credits’: {‘name’: ‘100 Credits’, ‘amount_cents’: 1999}, … } \>\>\> create_one_time_checkout_route(app, service, products) \>\>\> \# Checkout available at GET /buy/report or /buy/credits ------------------------------------------------------------------------ source ### create_portal_route ``` python def create_portal_route( app, service:StripeService, path:str='/billing-portal' ): ``` *Create a GET route for Stripe Customer Portal redirect.* Args: app: FastHTML application instance service: Configured StripeService instance path: URL path for portal endpoint Returns: The registered route handler Example: \>\>\> create_portal_route(app, service) \>\>\> \# Portal available at GET /billing-portal ------------------------------------------------------------------------ ## 🏭 Service Factory Singleton pattern for easy service access across the application. ------------------------------------------------------------------------ source ### reset_stripe_service ``` python def reset_stripe_service( ): ``` *Reset singleton instance (for testing only).* ------------------------------------------------------------------------ source ### get_stripe_service ``` python def get_stripe_service( config:StripeConfig=None )->StripeService: ``` *Get or create singleton StripeService instance.* Args: config: Optional StripeConfig. Uses from_env() on first call if not provided. Returns: StripeService singleton instance Example: \>\>\> service = get_stripe_service() \>\>\> checkout = service.create_subscription_checkout(…) ------------------------------------------------------------------------ ## 🚀 Quick Start ### 1. Set Environment Variables ``` bash # Required export STRIPE_SECRET_KEY="sk_test_..." export STRIPE_WEBHOOK_SECRET="whsec_..." # Create prices in Stripe Dashboard, then set IDs export STRIPE_MONTHLY_PRICE_ID="price_..." export STRIPE_YEARLY_PRICE_ID="price_..." # Application URL export STRIPE_BASE_URL="https://yourapp.com" ``` ### 2. Initialize Service & Routes ``` python from fasthtml.common import FastHTML from fh_saas.utils_stripe import ( get_stripe_service, create_webhook_route, create_subscription_checkout_route, create_portal_route, ) app = FastHTML() service = get_stripe_service() # Register routes create_webhook_route(app, service) create_subscription_checkout_route(app, service) create_portal_route(app, service) # Now available: # POST /stripe/webhook - Stripe webhook handler # GET /checkout/monthly - Monthly subscription checkout # GET /checkout/yearly - Yearly subscription checkout # GET /billing-portal - Customer self-service portal ``` ### 3. Gate Premium Features ``` python from fh_saas.utils_stripe import require_active_subscription @app.get('/premium') def premium_feature(request): error = require_active_subscription(request.state.tenant_id) if error: return error # 402 Payment Required return render_premium_content() ``` ### 4. Integrate with Auth Beforeware ``` python from fh_saas.utils_auth import create_auth_beforeware # Optional: Add require_subscription=True to beforeware # See utils_auth for integration examples ``` ======================================== FILE: _proc/17_utils_migrate.html.md # utils_migrate See [utils_migrate.py](../fh_saas/utils_migrate.py) for the source code. ======================================== FILE: _proc/05_utils_email.html.md # 📧 Email Sending ## 🎯 Overview
Category Functions Purpose
⚙️ Config get_smtp_config Load SMTP settings from env
📁 Templates get_template_path, load_template Manage markdown templates
✉️ Sending send_email, send_batch_emails Core send functions
🎁 Convenience send_welcome_email, send_invitation_email, send_password_reset_email Pre-built templates
------------------------------------------------------------------------ ## 🏗️ Architecture ┌─────────────────────────────────────────────────────────────────┐ │ Email Sending Flow │ ├─────────────────────────────────────────────────────────────────┤ │ 1. Load SMTP config from environment variables │ │ 2. Load markdown template from templates/ directory │ │ 3. Substitute template variables (user_name, etc.) │ │ 4. Convert markdown to HTML via markdown_merge │ │ 5. Send via SMTP (Azure, SendGrid, AWS SES, etc.) │ └─────────────────────────────────────────────────────────────────┘ ------------------------------------------------------------------------ ## 📋 Environment Variables
Variable Required Default Description
SMTP_HOST - SMTP server hostname
SMTP_PORT 587 SMTP server port
SMTP_USER - Auth username
SMTP_PASSWORD - Auth password
SMTP_MAIL_FROM - Sender email
SMTP_STARTTLS True Use STARTTLS
SMTP_SSL False Use SSL (disables TLS)
## ⚙️ SMTP Configuration
Function Purpose
get_smtp_config Load SMTP config from environment vars
------------------------------------------------------------------------ source ### get_smtp_config ``` python def get_smtp_config( )->Dict[str, Any]: ``` *Load SMTP configuration from environment variables.* ## 📁 Template Management **Built-in Templates:** We ship 3 production-ready markdown templates: - `welcome.md` - New user onboarding - `invitation.md` - Invite users to tenant - `password_reset.md` - Password recovery **Custom Templates:** You can provide your own templates by passing `custom_template_path` to any email function.
Function Purpose
get_template_path Resolve path to template file (built-in or custom)
load_template Read template content as string
------------------------------------------------------------------------ source ### load_template ``` python def load_template( template_name:str, # Template basename (e.g., 'welcome') custom_template_path:Optional[str | Path]=None, # Custom path overrides package templates )->str: ``` *Load markdown email template as string.* ------------------------------------------------------------------------ source ### get_template_path ``` python def get_template_path( template_name:str, # Template basename (e.g., 'welcome', 'invitation') custom_template_path:Optional[str | Path]=None, # Custom path overrides package templates )->Path: ``` *Get absolute path to email template file.* ## ✉️ Email Sending
Function Purpose
send_email Send single email with template
send_batch_emails Send to multiple recipients
------------------------------------------------------------------------ source ### send_email ``` python def send_email( to_email:str, # Recipient email address to_name:str, # Recipient display name subject:str, # Email subject line template_name:str, # Template name: 'welcome', 'invitation', 'password_reset' template_vars:Dict[str, str], # Variables to substitute in template test:bool=False, # If True, prints email instead of sending smtp_config:Optional[Dict[str, Any]]=None, # Custom SMTP config (defaults to env vars) custom_template_path:Optional[str | Path]=None, # Custom template path )->Dict[str, Any]: ``` *Send single email using markdown template with variable substitution.* ## 📦 Batch Sending
Function Purpose
send_batch_emails Send personalized emails to multiple recipients
------------------------------------------------------------------------ source ### send_batch_emails ``` python def send_batch_emails( recipients:List[Dict[str, str]], # List of dicts with 'email' and 'name' keys subject:str, # Email subject line template_name:str, # Template name: 'welcome', 'invitation', 'password_reset' template_vars_list:List[Dict[str, str]], # List of variable dicts, one per recipient test:bool=False, # If True, prints emails instead of sending pause:float=0.2, # Seconds between emails (rate limiting) smtp_config:Optional[Dict[str, Any]]=None, # Custom SMTP config custom_template_path:Optional[str | Path]=None, # Custom template path )->List[Dict[str, Any]]: ``` *Send personalized emails to multiple recipients.* ## 🎁 Convenience Functions Pre-built functions for common email types. All support `custom_template_path` for using your own templates.
Function Template Purpose
send_welcome_email welcome.md New user onboarding
send_invitation_email invitation.md Invite to tenant
send_password_reset_email password_reset.md Password recovery
------------------------------------------------------------------------ source ### send_welcome_email ``` python def send_welcome_email( to_email:str, # Recipient email address to_name:str, # Recipient display name user_name:str, # User's name for personalization tenant_name:str, # Tenant/organization name dashboard_url:str, # URL to user's dashboard test:bool=False, # If True, prints email instead of sending custom_template_path:Optional[str | Path]=None, # Custom welcome.md template path )->Dict[str, Any]: ``` *Send welcome email to new user. Template vars: {user_name}, {tenant_name}, {dashboard_url}, {to_email}* ------------------------------------------------------------------------ source ### send_invitation_email ``` python def send_invitation_email( to_email:str, # Recipient email address to_name:str, # Recipient display name inviter_name:str, # Name of person sending invitation tenant_name:str, # Tenant/organization name invitation_url:str, # URL to accept invitation test:bool=False, # If True, prints email instead of sending custom_template_path:Optional[str | Path]=None, # Custom invitation.md template path )->Dict[str, Any]: ``` *Send invitation email. Template vars: {inviter_name}, {tenant_name}, {invitation_url}, {to_email}* ------------------------------------------------------------------------ source ### send_password_reset_email ``` python def send_password_reset_email( to_email:str, # Recipient email address to_name:str, # Recipient display name user_name:str, # User's name for personalization reset_url:str, # Secure password reset URL test:bool=False, # If True, prints email instead of sending custom_template_path:Optional[str | Path]=None, # Custom password_reset.md template path )->Dict[str, Any]: ``` *Send password reset email. Template vars: {user_name}, {reset_url}, {to_email}* ## 📝 Template Customization ### Built-in Templates The package ships with 3 production-ready templates in `fh_saas/templates/`:
Template Variables
welcome.md {user_name}, {tenant_name}, {dashboard_url}, {to_email}
invitation.md {inviter_name}, {tenant_name}, {invitation_url}, {to_email}
password_reset.md {user_name}, {reset_url}, {to_email}
### Creating Custom Templates 1. Copy a built-in template as a starting point 2. Modify the markdown and keep variable names in `{brackets}` 3. Pass `custom_template_path='/path/to/template.md'` to any function Templates use **markdown_merge** format - standard markdown with `{variable}` placeholders. ======================================== FILE: _proc/11_utils_blog.html.md # 📝 Blog Publishing ## 🎯 Overview
Class Purpose
PostLoader Load markdown posts from filesystem
MarkdownEngine Render markdown to SEO-friendly HTML
## 📂 PostLoader
Method Purpose
.load_posts() Load all posts sorted by date
.get_post() Get single post by slug
------------------------------------------------------------------------ source ### PostLoader ``` python def PostLoader( posts_dir:str, # Directory containing .md files ): ``` *Load and parse markdown blog posts from filesystem* ------------------------------------------------------------------------ source ### PostLoader.load_posts ``` python def load_posts( )->List[Dict]: ``` *Load all markdown posts from directory.* Returns list of post dicts sorted by date (newest first). Each post contains: title, date, slug, body, categories, author, series. Example: \`\`\`python loader = PostLoader(‘blog/posts’) posts = loader.load_posts() for post in posts: print(f"{post['title']} - {post['slug']}") ``` ------------------------------------------------------------------------ source ### PostLoader.get_post ``` python def get_post( slug:str )->Optional[Dict]: # URL slug (e.g., 'my-post') ``` *Get single post by slug.* Example: `python post = loader.get_post('bg0010') if post: print(post['title'])` ## 🎨 MarkdownEngine
Method Purpose
.render() Convert markdown to HTML
------------------------------------------------------------------------ source ### MarkdownEngine ``` python def MarkdownEngine( ): ``` *Render markdown to HTML with SEO extensions* ------------------------------------------------------------------------ source ### MarkdownEngine.render ``` python def render( content:str )->str: # Markdown content ``` *Convert markdown to HTML.* Returns HTML string with proper semantic tags for SEO. Example: ```python engine = MarkdownEngine() html = engine.render('# Hello This is **bold**.’) print(html) \#

Hello

This is bold.

``` ------------------------------------------------------------------------ source ### MarkdownEngine.get_toc ``` python def get_toc( )->str: ``` *Get table of contents HTML from last render.* Must call render() first. Returns empty string if no headings. Example: ```python engine = MarkdownEngine() html = engine.render('# Title ## Section 1 ## Section 2’) toc = engine.get_toc() print(toc) # ``` ## FastHTML Integration Example Basic pattern for server-side rendering with FastHTML: ``` python from fasthtml.common import * from fh_saas.utils_blog import PostLoader, MarkdownEngine app = FastHTML() loader = PostLoader('blog/posts') engine = MarkdownEngine() @app.get('/blog') def blog_index(): posts = loader.load_posts() return Titled('Blog', *[Article( H2(A(p['title'], href=f"/blog/{p['slug']}")), P(p['description']), Small(p['date'].strftime('%Y-%m-%d') if p['date'] else '') ) for p in posts] ) @app.get('/blog/{slug}') def blog_post(slug: str): post = loader.get_post(slug) if not post: return 'Post not found', 404 html_content = engine.render(post['body']) toc = engine.get_toc() return Titled(post['title'], Article( NotStr(toc), # Table of contents NotStr(html_content) # Rendered markdown ) ) ``` ======================================== FILE: _proc/12_utils_seo.html.md # 🔍 SEO & Sitemaps ## 🎯 Overview
Function Purpose
generate_head_tags Meta tags for social sharing
generate_sitemap_xml XML sitemap for crawlers
generate_json_ld Structured data (Article schema)
## 🏷️ Meta Tags
Function Purpose
generate_head_tags OpenGraph, Twitter Card, canonical
------------------------------------------------------------------------ source ### generate_head_tags ``` python def generate_head_tags( title:str, # Page title description:str, # Page description (150-160 chars optimal) url:str, # Canonical URL image_url:Optional[str]=None, # OpenGraph image URL article_published:Optional[datetime]=None, # Publication date article_modified:Optional[datetime]=None, # Last modified date author:Optional[str]=None, # Author name )->List[tuple]: ``` *Generate meta tags for SEO.* Returns list of (tag_name, attributes_dict) tuples for FastHTML components. Includes standard, OpenGraph, and Twitter Card tags. Example: \`\`\`python from fasthtml.common import \* tags = generate_head_tags( title='My Blog Post', description='Learn about Python', url='https://example.com/blog/my-post', image_url='https://example.com/image.jpg' ) # Use in FastHTML app @app.get('/blog/my-post') def post(): return Html( Head( *[Meta(**attrs) if tag == 'meta' else Link(**attrs) for tag, attrs in tags] ), Body('...') ) ``` ## 🗺️ Sitemap
Function Purpose
generate_sitemap_xml XML sitemap for search crawlers
------------------------------------------------------------------------ source ### generate_sitemap_xml ``` python def generate_sitemap_xml( posts:List[Dict], # List of posts from PostLoader base_url:str, # Base URL (e.g., 'https://example.com') blog_path:str='/blog', # Blog path prefix )->str: ``` *Generate XML sitemap for blog posts.* Returns sitemap XML string with proper structure and lastmod dates. Example: \`\`\`python from fasthtml.common import \* from fh_saas.utils_blog import PostLoader app = FastHTML() loader = PostLoader('blog/posts') @app.get('/sitemap.xml') def sitemap(): posts = loader.load_posts() xml = generate_sitemap_xml( posts=posts, base_url='https://example.com', blog_path='/blog' ) return Response(xml, media_type='application/xml') ``` ## rss feed generator Generate RSS 2.0 feed for blog subscribers. ------------------------------------------------------------------------ source ### generate_rss_xml ``` python def generate_rss_xml( posts:List[Dict], # List of posts from PostLoader blog_title:str, # Blog title blog_description:str, # Blog description base_url:str, # Base URL blog_path:str='/blog', # Blog path prefix )->str: ``` *Generate RSS 2.0 feed for blog posts.* Returns RSS XML string for feed readers (Feedly, etc.). Example: \`\`\`python from fasthtml.common import \* from fh_saas.utils_blog import PostLoader app = FastHTML() loader = PostLoader('blog/posts') @app.get('/rss.xml') def rss(): posts = loader.load_posts()[:20] # Latest 20 posts xml = generate_rss_xml( posts=posts, blog_title='My Blog', blog_description='Tech tutorials and insights', base_url='https://example.com' ) return Response(xml, media_type='application/xml') ``` ## FastHTML Integration Example Complete SEO setup with FastHTML: ``` python from fasthtml.common import * from fh_saas.utils_blog import PostLoader, MarkdownEngine from fh_saas.utils_seo import generate_head_tags, generate_sitemap_xml, generate_rss_xml app = FastHTML() loader = PostLoader('blog/posts') engine = MarkdownEngine() @app.get('/blog/{slug}') def blog_post(slug: str): post = loader.get_post(slug) if not post: return 'Post not found', 404 # Generate SEO tags tags = generate_head_tags( title=post['title'], description=post['description'] or post['body'][:160], url=f"https://example.com/blog/{slug}", image_url=post.get('image'), article_published=post['date'], author=post.get('author') ) # Render markdown html_content = engine.render(post['body']) return Html( Head( Title(post['title']), *[Meta(**attrs) if tag == 'meta' else Link(**attrs) for tag, attrs in tags] ), Body( Article(NotStr(html_content)) ) ) @app.get('/sitemap.xml') def sitemap(): posts = loader.load_posts() xml = generate_sitemap_xml(posts, 'https://example.com') return Response(xml, media_type='application/xml') @app.get('/rss.xml') def rss(): posts = loader.load_posts()[:20] xml = generate_rss_xml( posts=posts, blog_title='My Blog', blog_description='Tech tutorials', base_url='https://example.com' ) return Response(xml, media_type='application/xml') ``` ======================================== FILE: _proc/13_utils_workflow.html.md # 🔧 Workflow Engine ## 🎯 Overview
Class Purpose
Workflow Execute callable steps sequentially
## 🔧 Workflow Class
Method Purpose
Workflow(steps) Initialize with list of callables
.execute() Run all steps sequentially
``` python from nbdev.showdoc import * ``` ------------------------------------------------------------------------ source ### Workflow ``` python def Workflow( steps:List ): ``` *Execute a list of callables sequentially. Minimal wrapper for code readability.* ------------------------------------------------------------------------ source ### Workflow.execute ``` python def execute( ): ``` *Execute all steps in order* ========================================