Data Pipeline Architecture
Sable uses a medallion architecture (Bronze → Silver → Gold) to transform raw CSV uploads into analytics-ready data.
Pipeline Flow
┌─────────────────────────────────────────────────────────────────────┐
│ 1. UPLOAD (Frontend) │
│ User uploads CSV → Supabase Storage → file_upload record │
│ Status: uploaded, detection_status: pending │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 2. DETECTION (file_watcher.py, 10s poll) │
│ Identify source_type (btig_pnl) → Extract account/date │
│ Status: detected, dbt_status: ready │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 3. BRONZE LOAD (pipelines/btig_pnl.py) │
│ CSV → bronze.pnl_raw (all TEXT columns, batch 1000) │
│ Status: detection_status: complete │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 4. JOB CREATION (worker.py, 30s poll) │
│ Find orphan files → Group by source_type → Create process_job │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 5. DBT TRANSFORM (worker.py) │
│ dbt run --select pnl_silver+ --vars '{"process_job_id": "uuid"}' │
│ Silver (TEXT→NUMERIC) → Gold (enriched, joined) │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 6. VERIFICATION (worker.py) │
│ Query gold.pnl_daily_dbt → Confirm data exists │
│ Status: completed/failed │
└─────────────────────────────────────────────────────────────────────┘
Medallion Layers
Bronze Layer
Schema: bronze
Table: bronze.pnl_raw
- All 32 columns stored as TEXT (preserves original exactly)
- No transformations or validations
- Tracks
file_upload_idandprocess_job_idfor lineage
SELECT * FROM bronze.pnl_raw WHERE file_upload_id = 'uuid' LIMIT 10;
Silver Layer
Schema: silver_intermediate
Table: silver_intermediate.pnl_dbt
- TEXT → proper types (NUMERIC, DATE, etc.)
- Deduplication by unique key
- Validation rules applied
Unique Key: (org_id, account_code, symbol, pnl_date)
{{ config(
materialized='incremental',
unique_key=['org_id', 'account_code', 'symbol', 'pnl_date']
) }}
Gold Layer
Schema: gold
Tables: gold.pnl_daily_dbt, gold.v_nav_daily, gold.v_capital_daily
- Enriched with entity/account joins
- Derived calculations (return_pct, quantity_change)
- Aggregations and views for reporting
Unique Key: (position_key, pnl_date)
Key Tables
| Table | Layer | Description |
|---|---|---|
bronze.pnl_raw | Bronze | Raw CSV data as TEXT |
silver_intermediate.pnl_dbt | Silver | Typed and validated |
gold.pnl_daily_dbt | Gold | Enriched daily P&L |
gold.v_nav_daily | Gold | NAV calculations |
gold.v_capital_daily | Gold | Capital with cash flows |
gold.v_dietz_daily | Gold | Modified Dietz returns |
Observability
23 automated tests run after batch uploads settle:
| Test | Severity | Purpose |
|---|---|---|
obs_01_data_freshness | error | Stale data detection |
obs_02_bronze_silver_parity | error | Pipeline breaks |
obs_04_recent_job_failures | error | Failed processing |
obs_09_duplicate_positions | error | Same position twice |
obs_17_pnl_formula_integrity | error | P&L formula validation |
Dead Man's Switch
Tests run automatically when pipeline is idle:
Job completes → Reset 30-min timer
↓
Scheduler checks every 5 min
↓
If idle ≥ 30 min: Run tests
↓
Store in sable.test_runs + sable.test_results
Common Commands
cd dbt
# Full pipeline
dbt run --select pnl_silver+
# Single model
dbt run --select gold.pnl_daily_dbt
# Run observability tests
dbt test --select tag:observability
# Check freshness
dbt source freshness