Data Pipelines
This document describes the data pipeline architecture using Meltano, dbt, and BigQuery.
Overview
The data pipeline provides:
- Data extraction from operational systems
- Transformation for analytics
- Data warehouse for reporting
- Orchestration with Dagster
Architecture
Meltano Configuration
Extractors
# meltano.yml
plugins:
extractors:
- name: tap-postgres
variant: meltanolabs
config:
host: ${POSTGRES_HOST}
port: 5432
user: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
database: lana
Loaders
plugins:
loaders:
- name: target-bigquery
variant: adswerve
config:
project_id: ${GCP_PROJECT_ID}
dataset_id: lana_raw
location: US
dbt Transformations
Model Structure
dagster/dbt/
├── models/
│ ├── staging/ # Clean raw data
│ │ ├── stg_customers.sql
│ │ ├── stg_facilities.sql
│ │ └── stg_transactions.sql
│ ├── intermediate/ # Business logic
│ │ ├── int_customer_metrics.sql
│ │ └── int_facility_performance.sql
│ └── marts/ # Final reporting
│ ├── fct_disbursals.sql
│ ├── fct_payments.sql
│ └── dim_customers.sql
└── dbt_project.yml
Example Model
-- models/staging/stg_customers.sql
{{ config(materialized='view') }}
select
id as customer_id,
email,
status,
customer_type,
created_at,
updated_at
from {{ source('lana_raw', 'customers') }}
where _sdc_deleted_at is null
Intermediate Model
-- models/intermediate/int_customer_metrics.sql
{{ config(materialized='table') }}
select
c.customer_id,
count(distinct f.facility_id) as total_facilities,
sum(f.principal_amount) as total_principal,
sum(f.outstanding_balance) as total_outstanding,
max(f.created_at) as last_facility_date
from {{ ref('stg_customers') }} c
left join {{ ref('stg_facilities') }} f
on c.customer_id = f.customer_id
group by 1
Dagster Orchestration
Asset Definition
# dagster/assets.py
from dagster import asset
from dagster_dbt import dbt_assets
@dbt_assets(manifest_path=DBT_MANIFEST_PATH)
def dbt_models(context):
yield from build_dbt_assets(context)
@asset(deps=[dbt_models])
def customer_report():
"""Generate daily customer report."""
query = """
SELECT * FROM `project.dataset.dim_customers`
"""
df = bigquery_client.query(query).to_dataframe()
return df
Schedule
from dagster import ScheduleDefinition
daily_pipeline = ScheduleDefinition(
job=etl_job,
cron_schedule="0 2 * * *", # 2 AM daily
)
Data Quality
dbt Tests
# models/schema.yml
version: 2
models:
- name: stg_customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- not_null
- unique
- name: status
tests:
- accepted_values:
values: ['ACTIVE', 'INACTIVE', 'SUSPENDED']
Custom Tests
-- tests/assert_positive_balances.sql
select
facility_id,
outstanding_balance
from {{ ref('fct_facilities') }}
where outstanding_balance < 0
Reporting Views
Financial Summary
-- models/marts/financial_summary.sql
select
date_trunc('month', transaction_date) as month,
sum(case when type = 'DEPOSIT' then amount end) as deposits,
sum(case when type = 'WITHDRAWAL' then amount end) as withdrawals,
sum(case when type = 'DISBURSAL' then amount end) as disbursals,
sum(case when type = 'PAYMENT' then amount end) as payments
from {{ ref('fct_transactions') }}
group by 1
order by 1
Access Control
BigQuery Permissions
| Role | Access Level |
|---|---|
| Data Engineer | Full access |
| Analyst | Read marts only |
| Reporting | Read specific views |