Структура проекта
Прежде всего — структура репозитория. Это первое, что видит собеседующий, и от неё зависит, разберётся ли он в проекте.
nyc-taxi-pipeline/
├── README.md
├── architecture.png
├── docker-compose.yml
├── .gitignore
├── .env.example
├── ingest/
│ ├── __init__.py
│ ├── ingester.py
│ └── requirements.txt
├── dbt/
│ ├── dbt_project.yml
│ ├── profiles.yml
│ ├── models/
│ │ ├── staging/
│ │ │ ├── _stg.yml
│ │ │ └── stg_trips.sql
│ │ ├── intermediate/
│ │ │ └── int_trips_clean.sql
│ │ └── marts/
│ │ ├── _marts.yml
│ │ ├── dim_date.sql
│ │ ├── dim_location.sql
│ │ ├── dim_vendor.sql
│ │ ├── dim_payment_type.sql
│ │ └── fct_trips.sql
│ ├── tests/
│ │ └── test_no_future_trips.sql
│ └── seeds/
│ └── taxi_zones.csv
├── streamlit/
│ ├── app.py
│ └── requirements.txt
├── airflow/ (обязательно)
│ └── dags/
│ └── nyc_taxi_dag.py
└── tests/
└── test_ingester.py
Чёткое разделение по компонентам помогает читателю быстро найти нужное.
Слой 1: Python ingester
Задача — скачивать NYC TLC Parquet файлы и грузить в DWH.
# ingest/ingester.py
import os
import logging
import requests
import duckdb
from pathlib import Path
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
NYC_BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"
DATA_DIR = Path("/tmp/nyc_taxi")
DATA_DIR.mkdir(parents=True, exist_ok=True)
def download_month(year_month: str) -> Path:
"""Скачать parquet за year_month (например '2024-01')."""
url = f"{NYC_BASE_URL}/yellow_tripdata_{year_month}.parquet"
local_path = DATA_DIR / f"yellow_tripdata_{year_month}.parquet"
if local_path.exists():
log.info("file_already_exists", extra={"path": str(local_path)})
return local_path
log.info("downloading", extra={"url": url, "target": str(local_path)})
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
log.info("download_complete", extra={
"path": str(local_path),
"size_mb": local_path.stat().st_size / 1024 / 1024
})
return local_path
def load_to_duckdb(parquet_path: Path, year_month: str, db_path: str = "data/warehouse.duckdb"):
"""Загрузить parquet в staging-таблицу. Idempotent по year_month."""
conn = duckdb.connect(db_path)
# 1. Создать staging-таблицу если нет
conn.execute("""
CREATE TABLE IF NOT EXISTS stg_trips_raw (
VendorID INTEGER,
tpep_pickup_datetime TIMESTAMP,
tpep_dropoff_datetime TIMESTAMP,
passenger_count DOUBLE,
trip_distance DOUBLE,
RatecodeID DOUBLE,
store_and_fwd_flag VARCHAR,
PULocationID INTEGER,
DOLocationID INTEGER,
payment_type INTEGER,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
congestion_surcharge DOUBLE,
airport_fee DOUBLE,
year_month VARCHAR
)
""")
# 2. Идемпотентно: DELETE для year_month, потом INSERT
conn.execute("DELETE FROM stg_trips_raw WHERE year_month = ?", [year_month])
# 3. Load parquet в staging
conn.execute(f"""
INSERT INTO stg_trips_raw
SELECT *, ? AS year_month FROM read_parquet('{parquet_path}')
""", [year_month])
row_count = conn.execute(
"SELECT COUNT(*) FROM stg_trips_raw WHERE year_month = ?",
[year_month]
).fetchone()[0]
log.info("loaded", extra={"year_month": year_month, "row_count": row_count})
conn.close()
def ingest(year_month: str):
"""Главная функция: download + load."""
log.info("ingest_start", extra={"year_month": year_month})
parquet_path = download_month(year_month)
load_to_duckdb(parquet_path, year_month)
log.info("ingest_complete", extra={"year_month": year_month})
if __name__ == "__main__":
import sys
year_month = sys.argv[1] if len(sys.argv) > 1 else "2024-01"
ingest(year_month)
Ключевые свойства:
- Идемпотентный: при повторном запуске за тот же month — DELETE+INSERT перезаписывает.
- Логирует с контекстом — JSON-style logs с
year_month,row_count. - Простой: одна функция
ingest(year_month). Можно вызвать из CLI или из Airflow.
Слой 2: dbt staging
dbt layers: staging, intermediate, marts — та же структура, что в реализации капстонаStaging — типизация и cleanup имён колонок.
-- dbt/models/staging/stg_trips.sql
{{ config(materialized='view') }}
SELECT
VendorID AS vendor_id,
tpep_pickup_datetime AS pickup_datetime,
tpep_dropoff_datetime AS dropoff_datetime,
passenger_count AS passenger_count,
trip_distance AS trip_distance,
PULocationID AS pickup_location_id,
DOLocationID AS dropoff_location_id,
payment_type AS payment_type_id,
fare_amount AS fare_amount,
tip_amount AS tip_amount,
tolls_amount AS tolls_amount,
total_amount AS total_amount,
year_month AS year_month,
-- Generated trip_id (no unique ID in source)
CAST(ROW_NUMBER() OVER (ORDER BY tpep_pickup_datetime, VendorID, PULocationID)
AS BIGINT) AS trip_id
FROM stg_trips_raw
# dbt/models/staging/_stg.yml
version: 2
models:
- name: stg_trips
description: "Staging для NYC Yellow Taxi trips — типизированные колонки, snake_case"
columns:
- name: trip_id
description: "Generated unique trip ID"
tests:
- unique
- not_null
- name: pickup_datetime
tests:
- not_null
- name: year_month
tests:
- not_null
Слой 3: dbt intermediate
Intermediate — фильтрация невалидных записей.
-- dbt/models/intermediate/int_trips_clean.sql
{{ config(materialized='ephemeral') }}
SELECT *
FROM {{ ref('stg_trips') }}
WHERE total_amount >= 0
AND fare_amount >= 0
AND trip_distance >= 0
AND pickup_datetime <= CURRENT_TIMESTAMP
AND dropoff_datetime >= pickup_datetime
AND passenger_count > 0
ephemeral — модель не материализуется в таблицу, а инлайнится в downstream через CTE. Подходит для intermediate, который используется только в одной mart-модели.
Слой 4: dbt marts (dimensions)
-- dbt/models/marts/dim_payment_type.sql
{{ config(materialized='table') }}
SELECT * FROM (
VALUES
(1, 'Credit card'),
(2, 'Cash'),
(3, 'No charge'),
(4, 'Dispute'),
(5, 'Unknown'),
(6, 'Voided trip')
) AS t(payment_type_id, payment_type_name)
-- dbt/models/marts/dim_location.sql
{{ config(materialized='table') }}
SELECT
LocationID AS location_id,
Borough AS borough,
Zone AS zone,
service_zone
FROM {{ ref('taxi_zones') }} -- seeds/taxi_zones.csv
-- dbt/models/marts/dim_date.sql
{{ config(materialized='table') }}
WITH date_spine AS (
SELECT DATE '2009-01-01' + (n || ' days')::INTERVAL AS full_date
FROM (SELECT generate_series(0, 365 * 20) AS n) t
)
SELECT
CAST(strftime(full_date, '%Y%m%d') AS INTEGER) AS date_key,
full_date,
EXTRACT(year FROM full_date) AS year,
EXTRACT(month FROM full_date) AS month,
EXTRACT(day FROM full_date) AS day,
EXTRACT(dow FROM full_date) AS day_of_week,
strftime(full_date, '%Y-%m') AS year_month
FROM date_spine
Слой 5: dbt marts (fact)
Главная таблица — fct_trips. Incremental по year_month.
-- dbt/models/marts/fct_trips.sql
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='year_month'
) }}
SELECT
trip_id,
pickup_datetime,
dropoff_datetime,
-- FK to dimensions
CAST(strftime(pickup_datetime::DATE, '%Y%m%d') AS INTEGER) AS pickup_date_key,
pickup_location_id,
dropoff_location_id,
vendor_id,
payment_type_id,
-- Measures
passenger_count,
trip_distance,
fare_amount,
tip_amount,
tolls_amount,
total_amount,
-- Derived
DATEDIFF('second', pickup_datetime, dropoff_datetime) AS trip_duration_seconds,
year_month
FROM {{ ref('int_trips_clean') }}
{% if is_incremental() %}
WHERE year_month IN ({{ var('months_to_load', '\'2024-01\'') }})
{% endif %}
# dbt/models/marts/_marts.yml
version: 2
models:
- name: fct_trips
description: "Fact table: one row = one trip"
columns:
- name: trip_id
tests:
- unique
- not_null
- name: pickup_datetime
tests:
- not_null
- name: total_amount
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: pickup_location_id
tests:
- relationships:
to: ref('dim_location')
field: location_id
- name: payment_type_id
tests:
- relationships:
to: ref('dim_payment_type')
field: payment_type_id
Слой 6: Custom tests
-- dbt/tests/test_no_future_trips.sql
SELECT * FROM {{ ref('fct_trips') }}
WHERE pickup_datetime > CURRENT_TIMESTAMP
-- dbt/tests/test_dropoff_after_pickup.sql
SELECT * FROM {{ ref('fct_trips') }}
WHERE dropoff_datetime < pickup_datetime
Каждый — SQL-запрос. Если возвращает строки, тест fail.
Слой 7: Streamlit dashboard
# streamlit/app.py
import streamlit as st
import duckdb
import pandas as pd
import plotly.express as px
st.set_page_config(page_title="NYC Taxi Dashboard", layout="wide")
st.title("NYC Yellow Taxi Analytics")
conn = duckdb.connect("data/warehouse.duckdb", read_only=True)
# Sidebar filter
year_months = conn.execute(
"SELECT DISTINCT year_month FROM fct_trips ORDER BY year_month"
).fetchdf()["year_month"].tolist()
selected_month = st.sidebar.selectbox("Month", year_months, index=len(year_months)-1)
# Metric 1: KPI cards
kpis = conn.execute(f"""
SELECT
COUNT(*) AS total_trips,
SUM(total_amount) AS total_revenue,
AVG(trip_distance) AS avg_distance,
AVG(total_amount) AS avg_fare
FROM fct_trips
WHERE year_month = '{selected_month}'
""").fetchone()
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total trips", f"{kpis[0]:,}")
col2.metric("Revenue", f"${kpis[1]:,.0f}")
col3.metric("Avg distance", f"{kpis[2]:.2f} mi")
col4.metric("Avg fare", f"${kpis[3]:.2f}")
# Chart 1: trips by day
daily = conn.execute(f"""
SELECT pickup_datetime::DATE AS date, COUNT(*) AS trips
FROM fct_trips
WHERE year_month = '{selected_month}'
GROUP BY 1 ORDER BY 1
""").fetchdf()
st.plotly_chart(px.bar(daily, x="date", y="trips", title="Trips by day"))
# Chart 2: top pickup zones
top_zones = conn.execute(f"""
SELECT z.zone, COUNT(*) AS trips
FROM fct_trips f JOIN dim_location z
ON f.pickup_location_id = z.location_id
WHERE year_month = '{selected_month}'
GROUP BY z.zone ORDER BY trips DESC LIMIT 10
""").fetchdf()
st.plotly_chart(px.bar(top_zones, x="trips", y="zone",
orientation="h", title="Top 10 pickup zones"))
Один файл, всё локально, работает за минуту. На pet-project — идеально.
Слой 8: Docker Compose
Связать всё локально:
# docker-compose.yml
version: "3.8"
services:
ingester:
build: ./ingest
volumes:
- ./data:/app/data
environment:
- YEAR_MONTH=${YEAR_MONTH:-2024-01}
command: python ingester.py ${YEAR_MONTH:-2024-01}
dbt:
image: python:3.12-slim
volumes:
- ./data:/app/data
- ./dbt:/app/dbt
working_dir: /app/dbt
command: bash -c "pip install dbt-duckdb && dbt run && dbt test"
streamlit:
build: ./streamlit
ports:
- "8501:8501"
volumes:
- ./data:/app/data
command: streamlit run app.py --server.address 0.0.0.0
Запуск:
docker-compose up ingester # загрузка данных
docker-compose up dbt # трансформации
docker-compose up streamlit # dashboard на localhost:8501
Junior tip: docker-compose даёт reproducibility — собеседующий запустит твой проект одной командой. Это сильно повышает шанс, что он реально посмотрит на код, а не закроет вкладку.
Оркестрация (обязательно): Airflow DAG
Оркестрация — обязательный слой капстона, а не бонус. Один Python-скрипт, дёргающий шаги по очереди, не показывает того, что от джуна ждут на собеседовании: что ты умеешь описать pipeline как граф задач, который сам перезапускает упавшее, ждёт готовности данных и переигрывает прошлое. Поэтому capstone-DAG ДОЛЖЕН продемонстрировать пять элементов.
- Multi-task dependency graph. Не один task, а граф: extract -> land_to_minio -> wait_for_raw -> flaky_quality_gate -> load_to_duckdb, после чего веер на
build_daily_metricsиassert_quality, и обе сходятся вend. Airflow строит DAG из>>-зависимостей и параллелит независимые ветки. - Настроенные ретраи. В
default_argsзадаёшьretriesиretry_delay(в лабе —retries=1,retry_delay=30s). Тогда транзиентный сбой не валит весь прогон: задача сама пробует ещё раз. - Sensor на object store. Задача
wait_for_raw— этоS3KeySensor(airflow.providers.amazon.aws.sensors.s3) сaws_conn_id="minio_s3", который в режимеpokeопрашивает MinIO, пока в bucketlakeне появится ключraw/trips/dt=DATE/trips.parquet. Это и есть data-dependency: загрузка в DuckDB не стартует, пока сырьё реально не приземлилось. - Backfill-прогон. Один из обязательных артефактов — прогон за диапазон дат:
airflow dags backfill -s 2024-01-01 -e 2024-01-05 nyc_taxi_pipeline. Так ты показываешь, что pipeline идемпотентен и переигрывается на истории, а не только на «сегодня». - Умышленное падение и recovery.
flaky_quality_gateи poison-date (2024-01-03 с отрицательнымfare_amount) специально ломают прогон. Ты чинишь данные (например,DELETEстрок сfare_amountменьше нуля за эту дату), потомairflow tasks clear ... -t assert_quality ...и показываешь зелёный recovered-прогон в grid view.
# airflow/dags/nyc_taxi_dag.py (структура; полная сборка — в LAB-03)
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta
with DAG(
dag_id="nyc_taxi_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
default_args={"retries": 1, "retry_delay": timedelta(seconds=30)},
tags=["lab", "capstone", "nyc-taxi"],
) as dag:
# start >> extract >> land_to_minio >> wait_for_raw (S3KeySensor)
# >> flaky_quality_gate >> load_to_duckdb
# load_to_duckdb >> [build_daily_metrics, assert_quality] >> end
wait_for_raw = S3KeySensor(
task_id="wait_for_raw",
aws_conn_id="minio_s3",
bucket_name="lake",
bucket_key="raw/trips/dt={{ ds }}/trips.parquet",
mode="poke",
poke_interval=10,
timeout=120,
)
Не сдавай оркестрацию как cron + один скрипт. Без графа, ретраев, sensor-а, backfill-а и продемонстрированного recovery капстон не закрывает оркестрацию — самый частый блок вопросов на junior-собеседовании по DE.
LAB-03 (nyc_taxi_pipeline) — это пошаговая управляемая сборка ровно этого DAG: уже настроенный коннект AIRFLOW_CONN_MINIO_S3, готовый MinIO с bucket lake, DuckDB на /opt/airflow/shared/warehouse/nyc.duckdb, poison-date и команды backfill/recovery. Сделай лабу — и оркестрационный артефакт капстона готов.
Что НЕ нужно для MVP
- Real-time streaming (Kafka / Spark Structured Streaming) — над junior.
- Custom orchestrator — используй готовый оркестратор (Airflow), не пиши свой. Сам Airflow-DAG при этом обязателен (см. выше).
- Глубокий CI/CD — добавишь во второй итерации.
- Multi-cloud / multi-region — over-engineering.
- Sophisticated ML features — это MLE.
Попробуй сам
- Скопируй ingester.py и запусти на январе 2024. Проверь, что DuckDB-файл создался и в нём есть строки.
- Поставь dbt-duckdb (
pip install dbt-duckdb), сделай init проекта, добавь staging-модель из этой главы. Запустиdbt run. - Запусти Streamlit (
streamlit run app.py). Проверь, что дашборд открывается на localhost:8501.