--- name: implementing-warehouse-sources description: Implement and extend PostHog Data warehouse import sources. Use when adding a new source under posthog/temporal/data_imports/sources, adding datasets/endpoints to an existing source, or adding incremental sync, resumable imports, webhook ingestion, pagination, credentials validation, and source tests. --- # Implementing Data warehouse sources Use this skill when building or updating Data warehouse sources in `posthog/temporal/data_imports/sources/`. ## Read first Before coding, read: - `posthog/temporal/data_imports/sources/source.template` (use the top-of-file TODOs as a starting reference, but verify target files against the current source implementations — the template can drift, e.g. it currently still points at the old `posthog/warehouse/types.py` path instead of `products/data_warehouse/backend/types.py`) - `posthog/temporal/data_imports/sources/README.md` - `posthog/temporal/data_imports/sources/SOURCES.md` — inventory of every registered source with its communication method (HTTP / vendor SDK / gRPC / DB protocol / webhook) and tracked-transport state. Skim this first to see how similar sources are wired and what state today's source you're touching is in. **Keep it in sync** — see "Updating SOURCES.md" below. - `posthog/temporal/data_imports/sources/common/base.py` — base classes (`SimpleSource`, `ResumableSource`, `WebhookSource`) and the `FieldType` union - `posthog/temporal/data_imports/sources/common/resumable.py` — `ResumableSourceManager` - `posthog/temporal/data_imports/sources/common/webhook_s3.py` — `WebhookSourceManager` - 1 API source with `settings.py` + transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child with `type: "resolve"`), also read `posthog/temporal/data_imports/sources/common/rest_source/__init__.py` and `config_setup.py` (e.g. `process_parent_data_item`, `make_parent_key_name`). - For webhook-capable sources, read `posthog/temporal/data_imports/sources/stripe/source.py` as the reference implementation. ## Picking the right base class Every new source **must** inherit from one (or a combination) of these: - **`SimpleSource[Config]`** — default for straightforward pull-based APIs where each run fully iterates the endpoint. - **`ResumableSource[Config, ResumableData]`** — **preferred for any new API-backed source whose underlying API supports resumption** (cursor/link-header pagination, time windows, offset tokens, or any other deterministic way to pick back up where we left off). If the API gives us a next-page token, a `Link` header, or a stable time filter, use `ResumableSource`. This lets Temporal resume after heartbeat timeouts without restarting from scratch. The manager persists state to Redis (24h TTL). - **`WebhookSource[Config]`** — only when the source can push events to us (e.g. Stripe webhook endpoints). Typically combined with `ResumableSource` so the initial backfill is resumable and subsequent deltas come via webhook. Combine by multiple inheritance when both apply, e.g.: ```python class StripeSource( ResumableSource[StripeSourceConfig, StripeResumeConfig], WebhookSource[StripeSourceConfig], OAuthMixin, ): ... ``` Rule of thumb: - Pull-only API, no cursor we can persist → `SimpleSource`. - Pull-only API with any cursor/next-page/time-filter we can save between runs → `ResumableSource`. - Source can call us back with change events → add `WebhookSource` on top of whichever pull base fits. Databases and file-transfer sources (SFTP, S3) stay on `SimpleSource` unless there's a clear reason otherwise. ## End-to-end workflow for a new API source Follow this order. Each step maps to TODOs in `source.template`. 1. **Survey the source.** Pick the endpoints a user will actually want. Cross-reference: - Airbyte: (connector pages often link to source code — useful reference) - Fivetran: - Stitch: Find the official API docs or OpenAPI spec. Make sure it's the current version, not a deprecated one. 2. **Bootstrap the source.** Copy the template and wire up the enum/type references: ```sh mkdir -p posthog/temporal/data_imports/sources/{SOURCE_NAME} cp posthog/temporal/data_imports/sources/source.template posthog/temporal/data_imports/sources/{SOURCE_NAME}/source.py ``` Then update the two hand-edited files (the template still lists `posthog/schema.py` too, but that file is regenerated by `pnpm run schema:build` in step 12 — don't maintain it by hand): - `ExternalDataSourceType` at `products/data_warehouse/backend/types.py` — follow the existing convention in that file: `ALL_CAPS` with **no underscores** between words (e.g. `ACTIVECAMPAIGN`, `APPLESEARCHADS`), value is `PascalCase` - `externalDataSources` at `frontend/src/queries/schema/schema-general.ts` (`lower-kebab-case`) 3. **Pick the base class** (see above) and rename the class / `source_type` return. 4. **Define `get_source_config`** — name, label, caption, docsUrl, iconPath, fields. Use appropriate field types (see below). 5. **Register** the source — add an import line to `posthog/temporal/data_imports/sources/__init__.py` and include it in `__all__`. (The `@SourceRegistry.register` decorator on the class handles runtime registration.) 6. **Run the config generator**: `pnpm run generate:source-configs`. Confirm the new config class appears in `posthog/temporal/data_imports/sources/generated_configs.py`. **Do not edit that file by hand.** Every time you change `get_source_config.fields`, re-run the generator. 7. **Swap the generic `Config` type** in `source.py` for the generated `{Source}SourceConfig` class. 8. **Implement**: `validate_credentials`, `get_schemas`, `source_for_pipeline` (plus `get_resumable_source_manager` / `get_webhook_source_manager` as needed). 9. **Split transport logic.** Put API client, paginator, row normalization, and `SourceResponse` assembly in `{source}.py`. Keep endpoint catalog/incremental fields/primary keys/partition defaults in `settings.py`. 10. **Add icon.** Place at `frontend/public/services/{source}.svg` (prefer SVG). If the logo isn't already committed, fetch from [Logo.dev](https://docs.logo.dev/introduction) — **ask the user for the Logo.dev API key**; do not hardcode one. Keep file size reasonable. 11. **Run migrations.** `DEBUG=1 python manage.py makemigrations && DEBUG=1 ./bin/migrate` (only needed if a new enum value triggers a Django migration). 12. **Rebuild schema types**: `pnpm run schema:build`. This updates `posthog/schema.py` from `schema-general.ts` and makes the source appear in frontend dropdowns. Re-run whenever `schema-general.ts` changes. 13. **Release status.** For unfinished work, set `unreleasedSource=True`. Set `releaseStatus="alpha"` for new sources that haven't been extensively tested, `releaseStatus="beta"` once most rough edges are ironed out, and leave `releaseStatus` unset for general availability. For controlled rollout, set `featureFlag="dwh-{source_name}"` (kebab-case). When fully releasing, remove `unreleasedSource`, set `releaseStatus` to the appropriate stage (or omit for GA), and optionally drop the feature flag. 14. **Delete the template TODO comments** before PR. ## Source architecture contract For API-backed sources, use this split: - `source.py`: source registration, source form fields, schema list, credential validation, resumable/webhook manager wiring, pipeline handoff. - `settings.py`: endpoint catalog, incremental fields, primary key, partition defaults. - `{source}.py`: API client/auth, paginator, request params, row normalization, and `SourceResponse`. This keeps endpoint behavior declarative and easy to extend. For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in `settings.py` and route in `{source}.py` with this priority: 1. endpoint-specific custom iterators (only when required), 2. generic fan-out helper path, 3. top-level endpoint path. ## Source fields (the form the user fills in) Defined in `get_source_config.fields`. All field types live in `posthog/schema.py` and are unioned as `FieldType` in `posthog/temporal/data_imports/sources/common/base.py`. - `SourceFieldInputConfig` — basic input (`text`, `email`, `number`, `password`, `textarea`). Rendered as ``. - `SourceFieldSwitchGroupConfig` — toggle that reveals a sub-group of fields. Use for optional feature blocks. - `SourceFieldSelectConfig` — dropdown. Options can carry sub-`fields` shown when selected (use for alternative auth methods — e.g. API key vs OAuth). - `SourceFieldOauthConfig` — OAuth via `Integration` model. See OAuth section. - `SourceFieldFileUploadConfig` — file upload (JSON). Use `keys=["..."]` allow-list or `"*"`. - `SourceFieldSSHTunnelConfig` — renders SSH tunnel sub-fields; adds `ssh_tunnel: SSHTunnel` to the config with helpers. Guidelines: - Multiple auth methods → `SourceFieldSelectConfig` with child `fields` per option. - Optional toggles → `SourceFieldSwitchGroupConfig`. - Confidential fields must use `SourceFieldInputConfigType.PASSWORD`. The serializer derives sensitive vs nonsensitive keys automatically from the field definitions — you do not need to maintain an allow-list elsewhere. ## Implementing `source_for_pipeline` Return a `SourceResponse` directly. **Do not** use `dlt_source_to_source_response` for new sources — DLT is being removed. Prefer yielding data in the shape the API returns it. No custom dataclasses, no heavy parsing. Yield either `dict`, `list[dict]` (preferred when possible), or a `pyarrow.Table`. The pipeline buffers and batches for you. **Don't import or instantiate `Batcher` at the source layer.** The pipeline already runs one (`pipelines/pipeline/pipeline.py`) at the same 5000-row / 200 MiB thresholds. Yielding raw `dict` / `list[dict]` from your generator is the canonical path — reach for `pyarrow.Table` only when you already have arrow-shaped data (e.g., a ClickHouse adapter). Source-level batching results in double-buffering with no behavioral win. For pyarrow tables, cap in-memory rows at ~200 MiB or ~5000 rows. Use helpers like `table_from_iterator()` / `table_from_py_list()` from `posthog/temporal/data_imports/pipelines/pipeline/utils.py`. **URL construction:** use `urllib.parse.urlencode` for query strings. Don't use `requests.Request(...).prepare().url` — `PreparedRequest.url` is typed `Optional[str]` and the typical workaround (`prepared.url or f"..."`) carries an unreachable fallback. `urlencode` is shorter, dependency-free, and produces identical output for ASCII-safe params. ### Resumable source pattern ```python @dataclasses.dataclass class MyResumeConfig: next_url: str # or cursor, offset, time window — whatever the API uses class MySource(ResumableSource[MySourceConfig, MyResumeConfig]): def get_resumable_source_manager(self, inputs: SourceInputs) -> ResumableSourceManager[MyResumeConfig]: return ResumableSourceManager[MyResumeConfig](inputs, MyResumeConfig) def source_for_pipeline( self, config: MySourceConfig, resumable_source_manager: ResumableSourceManager[MyResumeConfig], inputs: SourceInputs, ) -> SourceResponse: return my_source(..., resumable_source_manager=resumable_source_manager) ``` In the transport function: ```python resume = manager.load_state() if manager.can_resume() else None url = resume.next_url if resume else initial_url while True: data = fetch_page(url) # yield batch next_url = data.get("links", {}).get("next") if not next_url: break manager.save_state(MyResumeConfig(next_url=next_url)) url = next_url # advance before the next fetch, otherwise we loop on the same page ``` Save state **after** yielding each batch, not before — so if we crash we re-yield the last batch (merge dedupes on primary key) rather than skipping it. ### Webhook source pattern - Implement `webhook_template` returning a `HogFunctionTemplateDC` that transforms incoming webhook payloads. - Implement `webhook_resource_map` mapping our schema name → external object type. - Implement `create_webhook`, `delete_webhook`, `get_external_webhook_info` if the API allows programmatic webhook management. Otherwise return a failed result and provide a `webhookSetupCaption` explaining manual setup. - Add `webhookFields` to `SourceConfig` for post-setup inputs (e.g. signing secret). - In `source_for_pipeline`, call `self.get_webhook_source_manager(inputs)` and pass its iterator alongside the pull iterator so a single sync pulls historical + webhook-delivered rows. - Populate `SourceSchema.supports_webhooks=True` only for endpoints where webhooks are actually viable (usually incremental/append-only ones). ## Outbound HTTP must go through the tracked transport Every HTTP call from `posthog/temporal/data_imports/sources/**` must go through `make_tracked_session()` (from `posthog.temporal.data_imports.sources.common.http`). The tracked session attaches `team_id`, `source_type`, `external_data_source_id`, `external_data_schema_id`, and `external_data_job_id` to every outbound request's log line and OTel metric, and participates in opt-in sample capture. - For raw `requests` usage: `make_tracked_session(headers=..., retry=...)` returns a `requests.Session`. Use `session.get/post/...` instead of the module-level `requests.get/...` shortcuts. - For sources that already go through `rest_source.RESTClient`: it defaults to a tracked session automatically; no change needed. - For vendor SDKs that accept a session/HTTP-client hook (Stripe `RequestsClient(session=...)`, gspread `authorize(credentials, session=...)`, BigQuery via `AuthorizedSession` + `TrackedHTTPAdapter`), inject one. Reference patterns live in `stripe/stripe.py`, `google_sheets/google_sheets.py`, and `bigquery/bigquery.py`. - For vendor SDKs with no injection seam (today: `bingads`, `linkedin-api`'s `RestliClient`, anything pure-gRPC), add a `# nosemgrep: data-imports-http-transport-...` pragma with a one-line reason and record the source as `⚠️ Vendor SDK` in `SOURCES.md`. CI enforces this via `.semgrep/rules/data-imports-http-transport.yaml`. The rule bans direct `requests.Session()`, `requests.(...)`, and `httpx.Client/AsyncClient/` inside `sources/**`. Type-only imports (`from requests import Response`, `from requests.exceptions import HTTPError`) remain allowed. ## Updating SOURCES.md `posthog/temporal/data_imports/sources/SOURCES.md` is the inventory of every registered source, its communication method, and whether its outbound traffic is tracked. Update it as part of the same PR whenever you: - **Add a new source** — initially as a Scaffolded entry; move it into the Implemented table once you ship working sync logic. - **Implement a previously scaffolded source** — move the row into the Implemented table and fill in comm method, primary library, and tracked-transport state. - **Migrate a vendor SDK** to inject a tracked session — flip the source from `⚠️ Vendor SDK` to `✅`. - **Switch a source's protocol** — e.g. swap REST for gRPC, add webhook support alongside the pull API, or move from `requests` to a vendor SDK. Update both the comm method and tracked-transport columns. Keep the entries alphabetical within each table. If you add a partially-tracked source, also append a short "Notes on partially-tracked sources" entry explaining what blocks tracking (no SDK seam, gRPC, etc.). ## Required coding conventions - Register with `@SourceRegistry.register`. - Inherit `SimpleSource[GeneratedConfig]` unless resumable/webhook behavior is required. - API sources should usually return `table_format="delta"` in endpoint resources. - `primary_keys` are endpoint-specific (declare in `settings.py`, not always `id`). Use composite keys when no single field is unique. - Add partitioning for new sources where possible: - API sources: `partition_mode="datetime"` with a **stable** datetime field. - Database sources: `partition_count` and `partition_size`. - Pick a partition key that **does not change** — `created_at`, `dateCreated`, `firstSeen`. Never use `updated_at` or `lastSeen`. - Add `get_non_retryable_errors()` for known permanent failures (401/403, invalid/expired credentials, missing scopes). - Keep comments minimal and only when intent is not obvious. - Python imports at the top of the module, not inside functions (unless needed to break circular imports). ## Incremental sync guidance - **Only set `supports_incremental=True` when the API exposes a server-side timestamp filter** (`_gte`, `since`, `modified_after`, etc.). A "client-side cursor" that fetches every page and skips already-seen rows in Python is **not** incremental — every run still hits every page, so the API cost of an "incremental" sync ends up identical to a full refresh. If the API has no server filter, ship full refresh only. - If the API supports server-side time filtering, use it and map from `db_incremental_field_last_value`. - **Honor `inputs.incremental_field`** — that's the user's chosen cursor field from the schema settings. `INCREMENTAL_FIELDS` per-endpoint is the menu of _advertised options_; don't reach into `INCREMENTAL_FIELDS[endpoint][0]` to pick a default and silently override the user's selection. - **Per-endpoint sort enums vary.** Don't hardcode `?sorting=created_at` (or whatever) globally. Verify each list endpoint's allowed sort values against the API spec **and** with a curl smoke-test against the live API — APIs frequently document one set of options and silently reject another, or use a different timestamp column on certain resources. - **Pass `?sorting=` explicitly on a stable monotonic field when paginating.** For incremental sources, the request sort must match `SourceResponse.sort_mode` (`"asc"` typically; `"desc"` only when forced by the API — see `stripe/stripe.py`, `github/settings.py`) so the pipeline's cursor watermark advances correctly. For full-refresh sources, an explicit sort prevents page-boundary skips/duplicates if the API's implicit default is unstable or shifts as rows are inserted during the sync. - If the API only supports cursor pagination, still declare incremental fields if reliable and let merge semantics dedupe. - `sort_mode="desc"` only if the endpoint truly cannot return ascending. For descending sources, handle `db_incremental_field_earliest_value` to scroll earlier rows before newer ones (see Stripe). - Default unknown endpoints to full refresh first; enable incremental only after confirming a stable filter field and API ordering semantics. - Confirm partition keys against response schemas, not endpoint names. ## API behavior verification checklist Before finalizing endpoint logic, verify from docs **and** with curl against the live API (not just docs — APIs frequently silently ignore unknown params or document outdated enums): - Response shape: list vs object vs wrapped data (`{"data": [...]}`). - Pagination: Link header vs body cursor vs offset/page; how next-page termination is signaled. - Ordering guarantees: ascending/descending/undefined for time fields, and the API's _default_ sort if you don't pass one. - **Sort enum per endpoint:** which `sorting=` values does each list endpoint accept? Some APIs vary the allowed enum per resource. Confirm with curl that the value you intend to pass returns 200, and probe with a future-date cutoff to confirm whether timestamp filters are honored or silently ignored. - **Server-side timestamp filter:** does `_gte` / `since` / `modified_after` actually filter, or does the API accept it and ignore it? Test by passing a future date and checking whether the row drops out. - Rate-limit headers (window reset timestamp, concurrent limits). - Field stability: whether candidate incremental/partition fields can change over time. If undocumented, keep parsing/merge logic conservative and add a short code comment noting the uncertainty. ## Endpoint inventory workflow - Build an endpoint inventory before expanding coverage (path, auth scopes, grain, pagination style, primary key shape, incremental candidates). - Keep it in source-local docs (e.g. `posthog/temporal/data_imports/sources//api_inventory.md`). - Add endpoints in phases: org-level list endpoints → project-level fan-out → child/fan-out endpoints with bounded pagination. ## Top-level endpoints (org/account level) - Declare endpoint metadata in `settings.py` (`path`, `primary_key`, `incremental_fields`, `partition_key`, `sort_mode`). - Build through a single resource config helper; keep transport branches minimal. - Endpoint params stay declarative (`limit`, required filters). - Merge write disposition only when incremental semantics are reliable; otherwise full replace. ## Pagination tips - Some APIs use cursor pagination in `Link` headers — check both `rel="next"` and any results flag. - When following a full cursor URL from response headers, clear request params in paginator `update_request` to avoid duplicate query params. - For parent/child fan-out, keep hard page caps per parent resource to avoid unbounded scans. - Emit structured logs when page caps are reached (include resource name and parent identifiers). ## Retry and throttling strategy - Use `tenacity` instead of manual retry loops. - Retry transport failures and retryable status codes (`429`, transient `5xx`). - Prefer server-provided rate-limit reset headers on `429`; fall back to exponential backoff. - Bound and make deterministic (`stop_after_attempt`). Preserve clear terminal behavior. - Keep timeout/retry settings near the top of the module for easy tuning. ## Fan-out endpoints Fan-out = iterate a parent resource, then query child endpoints per parent. **Prefer dependent resources for single-hop fan-out.** Use `rest_api_resources` with a parent and child that declares `type: "resolve"` for the parent field. Shared infra (`rest_source/__init__.py`, `config_setup.process_parent_data_item`) paginates the parent and calls the child per parent row. Use `include_from_parent` so child rows carry parent fields (injected as `__` via `make_parent_key_name`). **Make fan-out declarative.** Add a fan-out config object in `settings.py` (e.g. `DependentEndpointConfig`) with `parent_name`, `resolve_param`, `resolve_field`, `include_from_parent`, optional parent field renames, and optional parent endpoint params. Route single-hop fan-out through a shared helper (e.g. `common/rest_source/fanout.py:build_dependent_resource`). **Parent field rename mapping belongs in the helper.** Callers should not branch on whether renames exist. **Per-endpoint pagination/selectors** — `build_dependent_resource` supports endpoint overrides (`parent_endpoint_extra`, `child_endpoint_extra` for `paginator` / `data_selector`, `page_size_param` for non-`limit` size params). **Path pre-formatting:** `process_parent_data_item` only does `str.format()` with the resolved param. Pre-format static placeholders with `.replace()` before passing to the resource config, so only the resolved placeholder remains. **Custom iterator only when fan-out is 2+ levels deep.** Reuse the same pagination/retry helpers as elsewhere. ## OAuth configuration Before implementing OAuth, **check if the integration already exists** — search `posthog/models/integration.py` loosely for the service name before concluding it's new. If new: 1. **Env vars**. Add to `posthog/settings/integrations.py`: ```python YOUR_SOURCE_CLIENT_ID = get_from_env("YOUR_SOURCE_CLIENT_ID", "") YOUR_SOURCE_CLIENT_SECRET = get_from_env("YOUR_SOURCE_CLIENT_SECRET", "") ``` 2. **Integration kind**. In `posthog/models/integration.py`: - Add to `IntegrationKind` enum. - Add to `OauthIntegration.supported_kinds`. - Add an `elif kind == "your-source": return OauthConfig(...)` branch in `oauth_config_for_kind()`. 3. **Redirect URI**: `https://localhost:8010/integrations/your-kind/callback` in the external service. 4. List any new env vars in the final handoff so they can be set in all environments. ## Non-retryable errors Override `get_non_retryable_errors()` to mark errors that should permanently fail instead of retrying: ```python def get_non_retryable_errors(self) -> dict[str, str | None]: return { "401 Client Error: Unauthorized for url: https://api.example.com": "Your API key is invalid or expired. Please generate a new key and reconnect.", "403 Client Error: Forbidden for url: https://api.example.com": "Your API key does not have the required permissions. Please check the key permissions and try again.", } ``` Common cases: 401 Unauthorized, 403 Forbidden, invalid/expired tokens, OAuth tokens needing re-auth. ## `validate_credentials` Called with `schema_name=None` at source-create (one cheap probe to confirm the token is genuine) and with `schema_name=` from the per-schema `incremental_fields` action (confirm scope for that specific endpoint). If the API distinguishes 401 (bad token) from 403 (valid token, missing scope), **accept 403 at source-create** — users may legitimately only grant scopes for the endpoints they want to sync. Re-raise 403 only when `schema_name` is set. Sync-time 403s are handled separately by `get_non_retryable_errors()`. ## Document required token scopes If the API issues OAuth scopes or per-resource access tokens, declare every scope the source actually calls so users know what to grant — don't make them grant the full set defensively. - **OAuth sources:** set `requiredScopes` on `SourceFieldOauthConfig` (space-separated string, matches the OAuth `scope` parameter format). The frontend diffs it against the integration's granted scopes and warns the user with a Reconnect action when any are missing. - **Non-OAuth sources (PAT, API key):** there's no integration object to inspect, so list scopes in the `caption` instead. Captions render through `LemonMarkdown`, so backticks, bold, and links work. ## Mixins From `posthog/temporal/data_imports/sources/common/mixins.py`: - `SSHTunnelMixin` — `with_ssh_tunnel()` context plus `make_ssh_tunnel_func()` for deferred tunnel opening. - `OAuthMixin` — `get_oauth_integration()` to pull `Integration` from the DB. - `ValidateDatabaseHostMixin` — `is_database_host_valid()` to block internal VPC IPs (unless SSH tunnel is used). ## Icons - Prefer SVG over PNG. Keep file size reasonable. - Place in `frontend/public/services/` and reference as `/static/services/{name}.svg` in `iconPath`. - If the source logo isn't already in the project, pull via [Logo.dev](https://docs.logo.dev/introduction). **Ask the user for the API key** — do not hardcode one. If the user hasn't provided one, surface that as a blocker rather than committing a placeholder. ## Testing expectations Add at least two test modules: - `tests/test__source.py` (source-class level): - `source_type` - `get_source_config` fields and labels - `get_schemas` outputs - `validate_credentials` success/failure - `source_for_pipeline` argument plumbing - for resumable sources: `get_resumable_source_manager` returns a manager bound to the right data class - for webhook sources: `create_webhook` / `delete_webhook` / `get_external_webhook_info` behavior, `webhook_resource_map` correctness, `webhook_template` presence - `tests/test_.py` (transport level): - paginator behavior from response headers/body - resource generation for incremental vs non-incremental - endpoint-specific primary key mapping - credential validation status mapping - mapper/filter helpers if present - fan-out endpoint row format assertions (dict shape + parent identifiers) - for dependent-resource fan-out: mock `rest_api_resources`, pass rows with `__` keys to exercise parent-field injection and rename behavior - expected return schema checks for each declared endpoint in `settings.py` - for resumable sources: resume-from-saved-state path (manager returns state, transport uses it as starting point); state is saved after each batch Prefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior. Use parameterized tests for status codes and edge cases. Lean toward over-covering. ## Implementation checklist ```text Bootstrapping: - [ ] Enum added to products/data_warehouse/backend/types.py (ALL_CAPS, no underscores between words) - [ ] Entry added to frontend/src/queries/schema/schema-general.ts (kebab-case) — `pnpm run schema:build` regenerates posthog/schema.py from this; don't hand-edit posthog/schema.py - [ ] Source imported in posthog/temporal/data_imports/sources/__init__.py + __all__ - [ ] Class inherits from SimpleSource / ResumableSource / WebhookSource (or combo) — see "Picking the right base class" Source implementation: - [ ] Define source fields in get_source_config - [ ] Implement validate_credentials - [ ] Implement get_schemas - [ ] Add endpoint settings (settings.py) - [ ] Implement transport + paginator ({source}.py) - [ ] Return SourceResponse with correct primary_keys, partitioning, sort_mode - [ ] Implement get_resumable_source_manager if ResumableSource - [ ] Implement webhook methods if WebhookSource - [ ] Add get_non_retryable_errors for auth/permission errors Tooling & assets: - [ ] Icon in frontend/public/services/ (SVG preferred — ask user for Logo.dev key if needed) - [ ] Run `pnpm run generate:source-configs` - [ ] Swap generic Config for generated {Source}SourceConfig in source.py - [ ] Run `pnpm run schema:build` - [ ] Django migrations run if enum value requires it Release status: - [ ] unreleasedSource=True while WIP - [ ] releaseStatus="alpha" for new sources not yet extensively tested - [ ] releaseStatus="beta" when most rough edges have been ironed out - [ ] Omit releaseStatus (or set to "ga") on full release - [ ] featureFlag="dwh-{source_name}" for controlled rollout - [ ] Flag removed / unreleasedSource removed on full release Tests & handoff: - [ ] Source tests (test__source.py) - [ ] Transport tests (test_.py) - [ ] `ruff check . --fix` and `ruff format .` - [ ] List any new env vars (OAuth client IDs/secrets, etc) in the PR / handoff ``` ## Validation and generation workflow After changing source fields, re-run `pnpm run generate:source-configs` and `pnpm run schema:build`, then the targeted tests for the new source. Run `ruff check . --fix` and `ruff format .` on modified Python files. ## Common pitfalls - Source not visible in wizard: not registered/imported in `sources/__init__.py`, or `schema:build` not rerun. - Generated config class still empty: forgot `generate:source-configs` after updating fields. - Incremental sync misbehaving: wrong field name/type or wrong sort assumptions. - Endless retries for bad credentials: missing `get_non_retryable_errors`. - Resumable state never saved: forgot to call `save_state` after yielding a batch; or saved before yield and a crash causes data loss. - Webhook rows not landing: feature flag `warehouse-source-webhooks` disabled, or schema `is_webhook=False`, or `initial_sync_complete=False`. - Dependent resource path `KeyError`: pre-format static path placeholders (see Fan-out). - Silent truncation risk: page caps hit without logs/metrics. - Drift from refactors: unused function params/helpers left behind after endpoint behavior changes. - Type drift in endpoint config dicts: use source typing aliases (`Endpoint`, `ClientConfig`, `IncrementalConfig`) to keep static checks precise. - Partition key instability: picked `updated_at` instead of `created_at`; partitions rewrite on every sync. - Hardcoded Logo.dev key committed: always ask the user for the key at runtime.