Learning Platform
Глоссарий Troubleshooting
Урок 20.03 · 30 мин
Начальный
implementationpythonduckdbdbtcodecapstone

Структура проекта

Прежде всего — структура репозитория. Это первое, что видит собеседующий, и от неё зависит, разберётся ли он в проекте.

nyc-taxi-pipeline/
├── README.md
├── architecture.png
├── docker-compose.yml
├── .gitignore
├── .env.example
├── ingest/
│   ├── __init__.py
│   ├── ingester.py
│   └── requirements.txt
├── dbt/
│   ├── dbt_project.yml
│   ├── profiles.yml
│   ├── models/
│   │   ├── staging/
│   │   │   ├── _stg.yml
│   │   │   └── stg_trips.sql
│   │   ├── intermediate/
│   │   │   └── int_trips_clean.sql
│   │   └── marts/
│   │       ├── _marts.yml
│   │       ├── dim_date.sql
│   │       ├── dim_location.sql
│   │       ├── dim_vendor.sql
│   │       ├── dim_payment_type.sql
│   │       └── fct_trips.sql
│   ├── tests/
│   │   └── test_no_future_trips.sql
│   └── seeds/
│       └── taxi_zones.csv
├── streamlit/
│   ├── app.py
│   └── requirements.txt
├── airflow/                 (обязательно)
│   └── dags/
│       └── nyc_taxi_dag.py
└── tests/
    └── test_ingester.py

Чёткое разделение по компонентам помогает читателю быстро найти нужное.

Слой 1: Python ingester

Задача — скачивать NYC TLC Parquet файлы и грузить в DWH.

# ingest/ingester.py
import os
import logging
import requests
import duckdb
from pathlib import Path

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

NYC_BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"
DATA_DIR = Path("/tmp/nyc_taxi")
DATA_DIR.mkdir(parents=True, exist_ok=True)


def download_month(year_month: str) -> Path:
    """Скачать parquet за year_month (например '2024-01')."""
    url = f"{NYC_BASE_URL}/yellow_tripdata_{year_month}.parquet"
    local_path = DATA_DIR / f"yellow_tripdata_{year_month}.parquet"

    if local_path.exists():
        log.info("file_already_exists", extra={"path": str(local_path)})
        return local_path

    log.info("downloading", extra={"url": url, "target": str(local_path)})
    response = requests.get(url, stream=True, timeout=30)
    response.raise_for_status()

    with open(local_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    log.info("download_complete", extra={
        "path": str(local_path),
        "size_mb": local_path.stat().st_size / 1024 / 1024
    })
    return local_path


def load_to_duckdb(parquet_path: Path, year_month: str, db_path: str = "data/warehouse.duckdb"):
    """Загрузить parquet в staging-таблицу. Idempotent по year_month."""
    conn = duckdb.connect(db_path)

    # 1. Создать staging-таблицу если нет
    conn.execute("""
        CREATE TABLE IF NOT EXISTS stg_trips_raw (
            VendorID INTEGER,
            tpep_pickup_datetime TIMESTAMP,
            tpep_dropoff_datetime TIMESTAMP,
            passenger_count DOUBLE,
            trip_distance DOUBLE,
            RatecodeID DOUBLE,
            store_and_fwd_flag VARCHAR,
            PULocationID INTEGER,
            DOLocationID INTEGER,
            payment_type INTEGER,
            fare_amount DOUBLE,
            extra DOUBLE,
            mta_tax DOUBLE,
            tip_amount DOUBLE,
            tolls_amount DOUBLE,
            improvement_surcharge DOUBLE,
            total_amount DOUBLE,
            congestion_surcharge DOUBLE,
            airport_fee DOUBLE,
            year_month VARCHAR
        )
    """)

    # 2. Идемпотентно: DELETE для year_month, потом INSERT
    conn.execute("DELETE FROM stg_trips_raw WHERE year_month = ?", [year_month])

    # 3. Load parquet в staging
    conn.execute(f"""
        INSERT INTO stg_trips_raw
        SELECT *, ? AS year_month FROM read_parquet('{parquet_path}')
    """, [year_month])

    row_count = conn.execute(
        "SELECT COUNT(*) FROM stg_trips_raw WHERE year_month = ?",
        [year_month]
    ).fetchone()[0]

    log.info("loaded", extra={"year_month": year_month, "row_count": row_count})
    conn.close()


def ingest(year_month: str):
    """Главная функция: download + load."""
    log.info("ingest_start", extra={"year_month": year_month})
    parquet_path = download_month(year_month)
    load_to_duckdb(parquet_path, year_month)
    log.info("ingest_complete", extra={"year_month": year_month})


if __name__ == "__main__":
    import sys
    year_month = sys.argv[1] if len(sys.argv) > 1 else "2024-01"
    ingest(year_month)

Ключевые свойства:

  • Идемпотентный: при повторном запуске за тот же month — DELETE+INSERT перезаписывает.
  • Логирует с контекстом — JSON-style logs с year_month, row_count.
  • Простой: одна функция ingest(year_month). Можно вызвать из CLI или из Airflow.

Слой 2: dbt staging

dbt layers: staging, intermediate, marts — та же структура, что в реализации капстона

Staging — типизация и cleanup имён колонок.

-- dbt/models/staging/stg_trips.sql
{{ config(materialized='view') }}

SELECT
    VendorID                  AS vendor_id,
    tpep_pickup_datetime      AS pickup_datetime,
    tpep_dropoff_datetime     AS dropoff_datetime,
    passenger_count           AS passenger_count,
    trip_distance             AS trip_distance,
    PULocationID              AS pickup_location_id,
    DOLocationID              AS dropoff_location_id,
    payment_type              AS payment_type_id,
    fare_amount               AS fare_amount,
    tip_amount                AS tip_amount,
    tolls_amount              AS tolls_amount,
    total_amount              AS total_amount,
    year_month                AS year_month,
    -- Generated trip_id (no unique ID in source)
    CAST(ROW_NUMBER() OVER (ORDER BY tpep_pickup_datetime, VendorID, PULocationID)
         AS BIGINT) AS trip_id
FROM stg_trips_raw
# dbt/models/staging/_stg.yml
version: 2

models:
  - name: stg_trips
    description: "Staging для NYC Yellow Taxi trips — типизированные колонки, snake_case"
    columns:
      - name: trip_id
        description: "Generated unique trip ID"
        tests:
          - unique
          - not_null
      - name: pickup_datetime
        tests:
          - not_null
      - name: year_month
        tests:
          - not_null

Слой 3: dbt intermediate

Intermediate — фильтрация невалидных записей.

-- dbt/models/intermediate/int_trips_clean.sql
{{ config(materialized='ephemeral') }}

SELECT *
FROM {{ ref('stg_trips') }}
WHERE total_amount >= 0
  AND fare_amount >= 0
  AND trip_distance >= 0
  AND pickup_datetime <= CURRENT_TIMESTAMP
  AND dropoff_datetime >= pickup_datetime
  AND passenger_count > 0

ephemeral — модель не материализуется в таблицу, а инлайнится в downstream через CTE. Подходит для intermediate, который используется только в одной mart-модели.

Слой 4: dbt marts (dimensions)

-- dbt/models/marts/dim_payment_type.sql
{{ config(materialized='table') }}

SELECT * FROM (
    VALUES
        (1, 'Credit card'),
        (2, 'Cash'),
        (3, 'No charge'),
        (4, 'Dispute'),
        (5, 'Unknown'),
        (6, 'Voided trip')
) AS t(payment_type_id, payment_type_name)
-- dbt/models/marts/dim_location.sql
{{ config(materialized='table') }}

SELECT
    LocationID  AS location_id,
    Borough     AS borough,
    Zone        AS zone,
    service_zone
FROM {{ ref('taxi_zones') }}  -- seeds/taxi_zones.csv
-- dbt/models/marts/dim_date.sql
{{ config(materialized='table') }}

WITH date_spine AS (
    SELECT DATE '2009-01-01' + (n || ' days')::INTERVAL AS full_date
    FROM (SELECT generate_series(0, 365 * 20) AS n) t
)
SELECT
    CAST(strftime(full_date, '%Y%m%d') AS INTEGER) AS date_key,
    full_date,
    EXTRACT(year FROM full_date)     AS year,
    EXTRACT(month FROM full_date)    AS month,
    EXTRACT(day FROM full_date)      AS day,
    EXTRACT(dow FROM full_date)      AS day_of_week,
    strftime(full_date, '%Y-%m')     AS year_month
FROM date_spine

Слой 5: dbt marts (fact)

Главная таблица — fct_trips. Incremental по year_month.

-- dbt/models/marts/fct_trips.sql
{{ config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='year_month'
) }}

SELECT
    trip_id,
    pickup_datetime,
    dropoff_datetime,
    -- FK to dimensions
    CAST(strftime(pickup_datetime::DATE, '%Y%m%d') AS INTEGER) AS pickup_date_key,
    pickup_location_id,
    dropoff_location_id,
    vendor_id,
    payment_type_id,
    -- Measures
    passenger_count,
    trip_distance,
    fare_amount,
    tip_amount,
    tolls_amount,
    total_amount,
    -- Derived
    DATEDIFF('second', pickup_datetime, dropoff_datetime) AS trip_duration_seconds,
    year_month
FROM {{ ref('int_trips_clean') }}

{% if is_incremental() %}
WHERE year_month IN ({{ var('months_to_load', '\'2024-01\'') }})
{% endif %}
# dbt/models/marts/_marts.yml
version: 2

models:
  - name: fct_trips
    description: "Fact table: one row = one trip"
    columns:
      - name: trip_id
        tests:
          - unique
          - not_null
      - name: pickup_datetime
        tests:
          - not_null
      - name: total_amount
        tests:
          - dbt_utils.expression_is_true:
              expression: ">= 0"
      - name: pickup_location_id
        tests:
          - relationships:
              to: ref('dim_location')
              field: location_id
      - name: payment_type_id
        tests:
          - relationships:
              to: ref('dim_payment_type')
              field: payment_type_id

Слой 6: Custom tests

-- dbt/tests/test_no_future_trips.sql
SELECT * FROM {{ ref('fct_trips') }}
WHERE pickup_datetime > CURRENT_TIMESTAMP
-- dbt/tests/test_dropoff_after_pickup.sql
SELECT * FROM {{ ref('fct_trips') }}
WHERE dropoff_datetime < pickup_datetime

Каждый — SQL-запрос. Если возвращает строки, тест fail.

Слой 7: Streamlit dashboard

# streamlit/app.py
import streamlit as st
import duckdb
import pandas as pd
import plotly.express as px

st.set_page_config(page_title="NYC Taxi Dashboard", layout="wide")
st.title("NYC Yellow Taxi Analytics")

conn = duckdb.connect("data/warehouse.duckdb", read_only=True)

# Sidebar filter
year_months = conn.execute(
    "SELECT DISTINCT year_month FROM fct_trips ORDER BY year_month"
).fetchdf()["year_month"].tolist()
selected_month = st.sidebar.selectbox("Month", year_months, index=len(year_months)-1)

# Metric 1: KPI cards
kpis = conn.execute(f"""
    SELECT
        COUNT(*) AS total_trips,
        SUM(total_amount) AS total_revenue,
        AVG(trip_distance) AS avg_distance,
        AVG(total_amount) AS avg_fare
    FROM fct_trips
    WHERE year_month = '{selected_month}'
""").fetchone()

col1, col2, col3, col4 = st.columns(4)
col1.metric("Total trips", f"{kpis[0]:,}")
col2.metric("Revenue", f"${kpis[1]:,.0f}")
col3.metric("Avg distance", f"{kpis[2]:.2f} mi")
col4.metric("Avg fare", f"${kpis[3]:.2f}")

# Chart 1: trips by day
daily = conn.execute(f"""
    SELECT pickup_datetime::DATE AS date, COUNT(*) AS trips
    FROM fct_trips
    WHERE year_month = '{selected_month}'
    GROUP BY 1 ORDER BY 1
""").fetchdf()
st.plotly_chart(px.bar(daily, x="date", y="trips", title="Trips by day"))

# Chart 2: top pickup zones
top_zones = conn.execute(f"""
    SELECT z.zone, COUNT(*) AS trips
    FROM fct_trips f JOIN dim_location z
      ON f.pickup_location_id = z.location_id
    WHERE year_month = '{selected_month}'
    GROUP BY z.zone ORDER BY trips DESC LIMIT 10
""").fetchdf()
st.plotly_chart(px.bar(top_zones, x="trips", y="zone",
                       orientation="h", title="Top 10 pickup zones"))

Один файл, всё локально, работает за минуту. На pet-project — идеально.

Слой 8: Docker Compose

Связать всё локально:

# docker-compose.yml
version: "3.8"

services:
  ingester:
    build: ./ingest
    volumes:
      - ./data:/app/data
    environment:
      - YEAR_MONTH=${YEAR_MONTH:-2024-01}
    command: python ingester.py ${YEAR_MONTH:-2024-01}

  dbt:
    image: python:3.12-slim
    volumes:
      - ./data:/app/data
      - ./dbt:/app/dbt
    working_dir: /app/dbt
    command: bash -c "pip install dbt-duckdb && dbt run && dbt test"

  streamlit:
    build: ./streamlit
    ports:
      - "8501:8501"
    volumes:
      - ./data:/app/data
    command: streamlit run app.py --server.address 0.0.0.0

Запуск:

docker-compose up ingester       # загрузка данных
docker-compose up dbt            # трансформации
docker-compose up streamlit      # dashboard на localhost:8501
TIP

Junior tip: docker-compose даёт reproducibility — собеседующий запустит твой проект одной командой. Это сильно повышает шанс, что он реально посмотрит на код, а не закроет вкладку.

Оркестрация (обязательно): Airflow DAG

Оркестрация — обязательный слой капстона, а не бонус. Один Python-скрипт, дёргающий шаги по очереди, не показывает того, что от джуна ждут на собеседовании: что ты умеешь описать pipeline как граф задач, который сам перезапускает упавшее, ждёт готовности данных и переигрывает прошлое. Поэтому capstone-DAG ДОЛЖЕН продемонстрировать пять элементов.

  • Multi-task dependency graph. Не один task, а граф: extract -> land_to_minio -> wait_for_raw -> flaky_quality_gate -> load_to_duckdb, после чего веер на build_daily_metrics и assert_quality, и обе сходятся в end. Airflow строит DAG из >>-зависимостей и параллелит независимые ветки.
  • Настроенные ретраи. В default_args задаёшь retries и retry_delay (в лабе — retries=1, retry_delay=30s). Тогда транзиентный сбой не валит весь прогон: задача сама пробует ещё раз.
  • Sensor на object store. Задача wait_for_raw — это S3KeySensor (airflow.providers.amazon.aws.sensors.s3) с aws_conn_id="minio_s3", который в режиме poke опрашивает MinIO, пока в bucket lake не появится ключ raw/trips/dt=DATE/trips.parquet. Это и есть data-dependency: загрузка в DuckDB не стартует, пока сырьё реально не приземлилось.
  • Backfill-прогон. Один из обязательных артефактов — прогон за диапазон дат: airflow dags backfill -s 2024-01-01 -e 2024-01-05 nyc_taxi_pipeline. Так ты показываешь, что pipeline идемпотентен и переигрывается на истории, а не только на «сегодня».
  • Умышленное падение и recovery. flaky_quality_gate и poison-date (2024-01-03 с отрицательным fare_amount) специально ломают прогон. Ты чинишь данные (например, DELETE строк с fare_amount меньше нуля за эту дату), потом airflow tasks clear ... -t assert_quality ... и показываешь зелёный recovered-прогон в grid view.
# airflow/dags/nyc_taxi_dag.py (структура; полная сборка — в LAB-03)
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta

with DAG(
    dag_id="nyc_taxi_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    default_args={"retries": 1, "retry_delay": timedelta(seconds=30)},
    tags=["lab", "capstone", "nyc-taxi"],
) as dag:
    # start >> extract >> land_to_minio >> wait_for_raw (S3KeySensor)
    #   >> flaky_quality_gate >> load_to_duckdb
    # load_to_duckdb >> [build_daily_metrics, assert_quality] >> end
    wait_for_raw = S3KeySensor(
        task_id="wait_for_raw",
        aws_conn_id="minio_s3",
        bucket_name="lake",
        bucket_key="raw/trips/dt={{ ds }}/trips.parquet",
        mode="poke",
        poke_interval=10,
        timeout=120,
    )
WARNING

Не сдавай оркестрацию как cron + один скрипт. Без графа, ретраев, sensor-а, backfill-а и продемонстрированного recovery капстон не закрывает оркестрацию — самый частый блок вопросов на junior-собеседовании по DE.

LAB-03 (nyc_taxi_pipeline) — это пошаговая управляемая сборка ровно этого DAG: уже настроенный коннект AIRFLOW_CONN_MINIO_S3, готовый MinIO с bucket lake, DuckDB на /opt/airflow/shared/warehouse/nyc.duckdb, poison-date и команды backfill/recovery. Сделай лабу — и оркестрационный артефакт капстона готов.

Что НЕ нужно для MVP

  • Real-time streaming (Kafka / Spark Structured Streaming) — над junior.
  • Custom orchestrator — используй готовый оркестратор (Airflow), не пиши свой. Сам Airflow-DAG при этом обязателен (см. выше).
  • Глубокий CI/CD — добавишь во второй итерации.
  • Multi-cloud / multi-region — over-engineering.
  • Sophisticated ML features — это MLE.

Попробуй сам

  1. Скопируй ingester.py и запусти на январе 2024. Проверь, что DuckDB-файл создался и в нём есть строки.
  2. Поставь dbt-duckdb (pip install dbt-duckdb), сделай init проекта, добавь staging-модель из этой главы. Запусти dbt run.
  3. Запусти Streamlit (streamlit run app.py). Проверь, что дашборд открывается на localhost:8501.
Проверка знанийKnowledge check
В fct_trips.sql ты используешь incremental_strategy='delete+insert' с unique_key='year_month'. Что произойдёт при: 1) первом запуске dbt run; 2) повторном запуске с тем же year_month; 3) запуске с новым year_month через vars?
ОтветAnswer
1) Первый запуск — таблицы ещё нет, is_incremental() возвращает false, выполняется full load всего что в int_trips_clean (обычно — все year_month, которые есть в staging). 2) Повторный с тем же year_month — tablitsa существует, is_incremental() = true, фильтр WHERE year_month IN (vars) применяется, DELETE FROM fct_trips WHERE year_month = X, потом INSERT новых строк за этот month. Идемпотентно — то же финальное state. 3) С новым year_month — таблица существует, фильтр применяется, DELETE для нового year_month (его пока нет, ничего не удалит), INSERT данных за новый month. Аккумулирует историю. Это и есть incremental + idempotent pattern по партиции.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. В Python ingester для NYC Taxi какой паттерн обеспечивает идемпотентность загрузки за конкретный month?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4