Skip to main content

Data Pipeline Architecture

Sable uses a medallion architecture (Bronze → Silver → Gold) to transform raw CSV uploads into analytics-ready data.

Pipeline Flow

┌─────────────────────────────────────────────────────────────────────┐
│ 1. UPLOAD (Frontend) │
│ User uploads CSV → Supabase Storage → file_upload record │
│ Status: uploaded, detection_status: pending │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│ 2. DETECTION (file_watcher.py, 10s poll) │
│ Identify source_type (btig_pnl) → Extract account/date │
│ Status: detected, dbt_status: ready │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│ 3. BRONZE LOAD (pipelines/btig_pnl.py) │
│ CSV → bronze.pnl_raw (all TEXT columns, batch 1000) │
│ Status: detection_status: complete │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│ 4. JOB CREATION (worker.py, 30s poll) │
│ Find orphan files → Group by source_type → Create process_job │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│ 5. DBT TRANSFORM (worker.py) │
│ dbt run --select pnl_silver+ --vars '{"process_job_id": "uuid"}' │
│ Silver (TEXT→NUMERIC) → Gold (enriched, joined) │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│ 6. VERIFICATION (worker.py) │
│ Query gold.pnl_daily_dbt → Confirm data exists │
│ Status: completed/failed │
└─────────────────────────────────────────────────────────────────────┘

Medallion Layers

Bronze Layer

Schema: bronze Table: bronze.pnl_raw

  • All 32 columns stored as TEXT (preserves original exactly)
  • No transformations or validations
  • Tracks file_upload_id and process_job_id for lineage
SELECT * FROM bronze.pnl_raw WHERE file_upload_id = 'uuid' LIMIT 10;

Silver Layer

Schema: silver_intermediate Table: silver_intermediate.pnl_dbt

  • TEXT → proper types (NUMERIC, DATE, etc.)
  • Deduplication by unique key
  • Validation rules applied

Unique Key: (org_id, account_code, symbol, pnl_date)

{{ config(
materialized='incremental',
unique_key=['org_id', 'account_code', 'symbol', 'pnl_date']
) }}

Gold Layer

Schema: gold Tables: gold.pnl_daily_dbt, gold.v_nav_daily, gold.v_capital_daily

  • Enriched with entity/account joins
  • Derived calculations (return_pct, quantity_change)
  • Aggregations and views for reporting

Unique Key: (position_key, pnl_date)

Key Tables

TableLayerDescription
bronze.pnl_rawBronzeRaw CSV data as TEXT
silver_intermediate.pnl_dbtSilverTyped and validated
gold.pnl_daily_dbtGoldEnriched daily P&L
gold.v_nav_dailyGoldNAV calculations
gold.v_capital_dailyGoldCapital with cash flows
gold.v_dietz_dailyGoldModified Dietz returns

Observability

23 automated tests run after batch uploads settle:

TestSeverityPurpose
obs_01_data_freshnesserrorStale data detection
obs_02_bronze_silver_parityerrorPipeline breaks
obs_04_recent_job_failureserrorFailed processing
obs_09_duplicate_positionserrorSame position twice
obs_17_pnl_formula_integrityerrorP&L formula validation

Dead Man's Switch

Tests run automatically when pipeline is idle:

Job completes → Reset 30-min timer

Scheduler checks every 5 min

If idle ≥ 30 min: Run tests

Store in sable.test_runs + sable.test_results

Common Commands

cd dbt

# Full pipeline
dbt run --select pnl_silver+

# Single model
dbt run --select gold.pnl_daily_dbt

# Run observability tests
dbt test --select tag:observability

# Check freshness
dbt source freshness