Learning Platform
Глоссарий Troubleshooting
Урок 13.05 · 28 мин
Начальный
graph buildingCSVJSONnetworkxforeign keylineage

Где живут рёбра в реальной DE-работе

До сих пор мы строили графы из готовых литералов в коде. В реальной работе вершины и рёбра приходят из источников:

  • CSV/Parquet — список пар (source, target) экспортируется аналитиками или внешней системой.
  • JSON-manifestdbt manifest.json содержит все ref-зависимости моделей.
  • Information schema Postgres — referential_constraints описывает foreign keys.
  • YAML конфиг Airflow или GitLab CI — каждый job определяет needs: [...].
  • API — например, Looker возвращает зависимости view’ов через REST.

Этот урок — про практический процесс «из сырья — в dict[node] -> list[neighbors] или CSR». Никаких новых алгоритмов: всё про парсинг, нормализацию и подходящие коллекции.

Шаблон 1: CSV-edge-list

Самый частый формат — CSV с двумя колонками: from,to. Иногда добавляют вес или метаданные.

parent,child,weight
stg_users,int_user_orders,1.0
stg_orders,int_user_orders,1.0
stg_orders,int_order_items,1.0
stg_products,int_order_items,1.0
int_user_orders,mart_user_revenue,1.0
int_order_items,mart_user_revenue,1.0
int_order_items,mart_top_products,1.0

Базовая загрузка:

import csv
from collections import defaultdict

adj: dict[str, list[str]] = defaultdict(list)

with open("edges.csv", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        parent = row["parent"]
        child = row["child"]
        adj[parent].append(child)

print(adj["stg_orders"])
# ['int_user_orders', 'int_order_items']

Несколько практических замечаний:

  1. Используйте csv.DictReader, а не split по запятой. , внутри quoted-полей сломает простой split, DictReader справится.
  2. Кодировка utf-8. Имена таблиц могут содержать unicode (особенно в lineage импорта).
  3. defaultdict(list) избавляет от ручной проверки if u not in adj.
  4. Дубликаты в CSV — частый артефакт. Если одна строка появилась дважды, в adj[u] будет дубль соседа. Решение — либо использовать set вместо list, либо явно дедуплицировать после загрузки:
for u in list(adj.keys()):
    adj[u] = list(set(adj[u]))   # уберёт дубли, но не сохранит порядок
  1. Чистка пустых полей. Реальные CSV содержат пустые строки или None-значения от плохого экспорта:
for row in reader:
    parent = (row.get("parent") or "").strip()
    child = (row.get("child") or "").strip()
    if not parent or not child:
        continue
    adj[parent].append(child)

Шаблон 2: JSON-манифест dbt

dbt manifest.json — это огромный JSON, в котором каждая модель имеет блок depends_on.nodes. Упрощённо:

{
  "nodes": {
    "model.proj.mart_user_revenue": {
      "name": "mart_user_revenue",
      "depends_on": { "nodes": ["model.proj.int_user_orders", "model.proj.int_order_items"] }
    },
    "model.proj.int_user_orders": {
      "name": "int_user_orders",
      "depends_on": { "nodes": ["model.proj.stg_users", "model.proj.stg_orders"] }
    }
  }
}

Парсинг:

import json
from collections import defaultdict

with open("target/manifest.json", encoding="utf-8") as f:
    manifest = json.load(f)

# parents[child] = [parent1, parent2, ...]
parents: dict[str, list[str]] = defaultdict(list)
# children[parent] = [child1, child2, ...]
children: dict[str, list[str]] = defaultdict(list)

for node_id, node_info in manifest["nodes"].items():
    if not node_id.startswith("model."):
        continue   # игнорируем sources, seeds, tests
    name = node_info["name"]
    deps = node_info.get("depends_on", {}).get("nodes", [])
    for dep_id in deps:
        if not dep_id.startswith("model."):
            continue
        dep_name = manifest["nodes"][dep_id]["name"]
        parents[name].append(dep_name)
        children[dep_name].append(name)

print(parents["mart_user_revenue"])
# ['int_user_orders', 'int_order_items']
print(children["stg_orders"])
# ['int_user_orders', 'int_order_items']

Заметьте: мы держим оба словаря сразу — parents (in-neighbors) и children (out-neighbors). Это типичный паттерн lineage-сервисов: вопросы «откуда модель X» и «кто зависит от X» равно частые, и платить O(V+E) сканированием на каждый запрос — нерационально.

Шаблон 3: foreign keys из information_schema

В Postgres схему FK можно получить запросом:

SELECT
    tc.table_name        AS child,
    ccu.table_name       AS parent
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu
  ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
  AND tc.table_schema = 'public';

Питон-код:

import psycopg
from collections import defaultdict

fk: dict[str, list[str]] = defaultdict(list)

with psycopg.connect("dbname=app") as conn:
    rows = conn.execute("""
        SELECT tc.table_name AS child, ccu.table_name AS parent
        FROM information_schema.table_constraints tc
        JOIN information_schema.constraint_column_usage ccu
          ON ccu.constraint_name = tc.constraint_name
        WHERE tc.constraint_type = 'FOREIGN KEY'
          AND tc.table_schema = 'public'
    """).fetchall()

for child, parent in rows:
    fk[parent].append(child)

# что-то полезное: какие таблицы зависят от users
print(fk.get("users", []))

Тонкие моменты:

  • одна пара (parent, child) может появиться несколько раз — это разные FK с разными колонками. Уберите дубликаты через set, если важно;
  • self-referencing FK даёт ребро parent == child (петля), это валидный directed loop, но цикл длины 1 — обработайте отдельно, если в задаче не должно быть;
  • FK иногда указывает не на public-схему, расширяйте запрос если нужно.

Граф в networkx из любого источника

Если вы решили использовать networkx, билдинг сводится к серии add_edge:

import networkx as nx
import csv

G = nx.DiGraph()
with open("edges.csv", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        G.add_edge(row["parent"], row["child"], weight=float(row.get("weight", 1.0)))

print(G.number_of_nodes(), G.number_of_edges())
print(list(G.successors("stg_orders")))
# 7 7
# ['int_user_orders', 'int_order_items']

add_edge автоматически создаст обе вершины, если их ещё не было.

Из pandas DataFrame:

import pandas as pd
import networkx as nx

df = pd.read_csv("edges.csv")
G = nx.from_pandas_edgelist(
    df,
    source="parent",
    target="child",
    edge_attr="weight",
    create_using=nx.DiGraph(),
)

Это самый короткий способ для прототипов и аналитики.

Граф в CSR через scipy

Когда граф большой, выгоднее сразу строить CSR:

import numpy as np
from scipy.sparse import coo_matrix
import csv

# 1) собрать рёбра и составить отображение имя -> int
name_to_id: dict[str, int] = {}
src_list: list[int] = []
dst_list: list[int] = []

def get_id(name: str) -> int:
    if name not in name_to_id:
        name_to_id[name] = len(name_to_id)
    return name_to_id[name]

with open("edges.csv", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        src_list.append(get_id(row["parent"]))
        dst_list.append(get_id(row["child"]))

N = len(name_to_id)
data = np.ones(len(src_list), dtype=np.int8)

# COO -> CSR
G = coo_matrix(
    (data, (np.array(src_list), np.array(dst_list))),
    shape=(N, N)
).tocsr()

print(f"V={N}, E={G.nnz}, memory={G.data.nbytes + G.indices.nbytes + G.indptr.nbytes} bytes")

coo_matrix принимает три параллельных массива (data, row, col) и легко конвертируется в csr через .tocsr(). Это стандартный pipeline в scipy.sparse.

Pipeline построения графа

От сырых данных к рабочему графу: 4 шага.

ИсточникCSV / JSON / SQL — источник пар (u, v)
ПарсингПарсинг построчно: csv.DictReader, json.load, conn.execute
ЧисткаНормализация имён, фильтр пустых, дедупликация
Структураdefaultdict / networkx / scipy CSR — выбор зависит от размера и нужных операций

Распространённые баги в коде building graph

Баг 1: пропущенные изолированные вершины

Если строим adj через adj[u].append(v), вершина v с нулевым out-degree не появится как ключ — обращение adj.get(v) вернёт None или KeyError. Решение:

adj.setdefault(v, [])   # или: defaultdict(list) с явным touch

Или в конце цикла:

for u in list(adj.keys()):
    for v in adj[u]:
        adj.setdefault(v, [])

Баг 2: имена не уникальны

«users» в схеме public и «users» в схеме audit — разные таблицы. Если строите граф без префикса схемы, поломаете lineage. Норма — ключ вида "schema.table" или (schema, table)-кортеж.

Баг 3: одно ребро — несколько раз

Дубликаты в CSV или несколько FK от одной таблицы к другой. Если для алгоритма важна set-семантика рёбер (BFS, DFS — без разницы; PageRank — разница), дедуплицируйте сразу.

Баг 4: цикл, который не должен быть DAG

Если данные обещают DAG, но вы получили цикл — это сразу баг в источнике. Проверка в код:

import networkx as nx
G = nx.DiGraph(...)
if not nx.is_directed_acyclic_graph(G):
    print("ВНИМАНИЕ: граф не DAG, есть цикл!")
    print(nx.find_cycle(G))

find_cycle найдёт первый цикл и покажет его. Полезно для отладки.

DE-кейс: лезкий граф изменений в Production DWH

Сценарий: компания хочет регулярно генерировать lineage из dbt manifest и сравнивать с предыдущей версией. Цель — заметить «новая модель появилась», «модель X теперь зависит от Y, чего не было».

Pipeline:

import json
from collections import defaultdict

def build_lineage(manifest_path: str) -> dict[str, list[str]]:
    with open(manifest_path, encoding="utf-8") as f:
        manifest = json.load(f)
    children: dict[str, list[str]] = defaultdict(list)
    for node_id, node_info in manifest["nodes"].items():
        if not node_id.startswith("model."):
            continue
        name = node_info["name"]
        for dep_id in node_info.get("depends_on", {}).get("nodes", []):
            if not dep_id.startswith("model."):
                continue
            dep_name = manifest["nodes"][dep_id]["name"]
            children[dep_name].append(name)
    return dict(children)

def diff_graphs(g_old: dict, g_new: dict) -> dict[str, dict]:
    diff = {"added_models": [], "removed_models": [], "added_edges": [], "removed_edges": []}
    diff["added_models"] = sorted(set(g_new) - set(g_old))
    diff["removed_models"] = sorted(set(g_old) - set(g_new))
    for v in set(g_old) & set(g_new):
        old_set = set(g_old[v])
        new_set = set(g_new[v])
        for u in sorted(new_set - old_set):
            diff["added_edges"].append((v, u))
        for u in sorted(old_set - new_set):
            diff["removed_edges"].append((v, u))
    return diff

g1 = build_lineage("manifest_yesterday.json")
g2 = build_lineage("manifest_today.json")
print(diff_graphs(g1, g2))

Это рабочая утилита, которую можно поставить в CI и блокировать PR с подозрительными изменениями (например, новая зависимость от deprecated-модели).

Попробуй сам

Имейте CSV-файл fk.csv:

child,parent
orders,users
order_items,orders
order_items,products
payments,orders
sessions,users
events,users
events,sessions

Задачи:

  1. Постройте оба словаря: parents[child] и children[parent].
  2. Найдите вершины с in-degree = 0 (raw-сорсы) и с out-degree = 0 (терминальные).
  3. Найдите вершину с максимальным out-degree (центральная таблица), вершину с максимальным in-degree.
  4. Постройте граф в networkx и проверьте, что он — DAG.

Ожидаемые ответы:

  • Вершины с in-degree=0: {users, products}.
  • Вершины с out-degree=0: {order_items, payments, events}.
  • Максимальный out-degree: users (3: orders, sessions, events).
  • Максимальный in-degree: orders (1 от users) — на самом деле всего 1, а в данном графе. Проверьте по факту.
  • DAG: да.

В следующих модулях (12-й) мы научимся обходить эти графы — BFS, DFS, поиск циклов, topological sort. Без правильной структуры все эти алгоритмы будут медленнее, чем нужно.

Проверка знанийKnowledge check
У вас CSV с 1 миллионом edge-записей: parent_table, child_table. Цель — построить readonly-граф для частых BFS-запросов. Какие два шага вы сделаете и почему?
ОтветAnswer
Шаг 1: нормализация имён и присваивание целочисленных id. Имена-строки в Python тяжёлые (~50 байт каждая, плюс PyObject overhead). Заводим dict name_to_id и заменяем каждую строку на int. Это ускоряет всё дальнейшее (int-хеш быстрее string-хеш, int-сравнение быстрее) и сокращает память в 5-10 раз. Шаг 2: построение CSR через scipy.sparse.coo_matrix(...).tocsr(). При V≈100k и E=1M это даст массив indices размером 4 МБ и indptr ~400 КБ. Затем BFS — миллисекунды на запрос. Альтернатива через adj_list dict[int, list[int]] заняла бы ~200 МБ и BFS в 5-10 раз медленнее.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. При построении графа из CSV с парами parent,child через defaultdict(list) часто пропадают вершины с out-degree=0. Почему и как исправить?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5