Где живут рёбра в реальной DE-работе
До сих пор мы строили графы из готовых литералов в коде. В реальной работе вершины и рёбра приходят из источников:
- CSV/Parquet — список пар
(source, target)экспортируется аналитиками или внешней системой. - JSON-manifest —
dbt manifest.jsonсодержит все ref-зависимости моделей. - Information schema Postgres —
referential_constraintsописывает foreign keys. - YAML конфиг Airflow или GitLab CI — каждый job определяет
needs: [...]. - API — например, Looker возвращает зависимости view’ов через REST.
Этот урок — про практический процесс «из сырья — в dict[node] -> list[neighbors] или CSR». Никаких новых алгоритмов: всё про парсинг, нормализацию и подходящие коллекции.
Шаблон 1: CSV-edge-list
Самый частый формат — CSV с двумя колонками: from,to. Иногда добавляют вес или метаданные.
parent,child,weight
stg_users,int_user_orders,1.0
stg_orders,int_user_orders,1.0
stg_orders,int_order_items,1.0
stg_products,int_order_items,1.0
int_user_orders,mart_user_revenue,1.0
int_order_items,mart_user_revenue,1.0
int_order_items,mart_top_products,1.0
Базовая загрузка:
import csv
from collections import defaultdict
adj: dict[str, list[str]] = defaultdict(list)
with open("edges.csv", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
parent = row["parent"]
child = row["child"]
adj[parent].append(child)
print(adj["stg_orders"])
# ['int_user_orders', 'int_order_items']
Несколько практических замечаний:
- Используйте
csv.DictReader, а не split по запятой.,внутри quoted-полей сломает простой split, DictReader справится. - Кодировка
utf-8. Имена таблиц могут содержать unicode (особенно в lineage импорта). - defaultdict(list) избавляет от ручной проверки
if u not in adj. - Дубликаты в CSV — частый артефакт. Если одна строка появилась дважды, в
adj[u]будет дубль соседа. Решение — либо использоватьsetвместо list, либо явно дедуплицировать после загрузки:
for u in list(adj.keys()):
adj[u] = list(set(adj[u])) # уберёт дубли, но не сохранит порядок
- Чистка пустых полей. Реальные CSV содержат пустые строки или
None-значения от плохого экспорта:
for row in reader:
parent = (row.get("parent") or "").strip()
child = (row.get("child") or "").strip()
if not parent or not child:
continue
adj[parent].append(child)
Шаблон 2: JSON-манифест dbt
dbt manifest.json — это огромный JSON, в котором каждая модель имеет блок depends_on.nodes. Упрощённо:
{
"nodes": {
"model.proj.mart_user_revenue": {
"name": "mart_user_revenue",
"depends_on": { "nodes": ["model.proj.int_user_orders", "model.proj.int_order_items"] }
},
"model.proj.int_user_orders": {
"name": "int_user_orders",
"depends_on": { "nodes": ["model.proj.stg_users", "model.proj.stg_orders"] }
}
}
}
Парсинг:
import json
from collections import defaultdict
with open("target/manifest.json", encoding="utf-8") as f:
manifest = json.load(f)
# parents[child] = [parent1, parent2, ...]
parents: dict[str, list[str]] = defaultdict(list)
# children[parent] = [child1, child2, ...]
children: dict[str, list[str]] = defaultdict(list)
for node_id, node_info in manifest["nodes"].items():
if not node_id.startswith("model."):
continue # игнорируем sources, seeds, tests
name = node_info["name"]
deps = node_info.get("depends_on", {}).get("nodes", [])
for dep_id in deps:
if not dep_id.startswith("model."):
continue
dep_name = manifest["nodes"][dep_id]["name"]
parents[name].append(dep_name)
children[dep_name].append(name)
print(parents["mart_user_revenue"])
# ['int_user_orders', 'int_order_items']
print(children["stg_orders"])
# ['int_user_orders', 'int_order_items']
Заметьте: мы держим оба словаря сразу — parents (in-neighbors) и children (out-neighbors). Это типичный паттерн lineage-сервисов: вопросы «откуда модель X» и «кто зависит от X» равно частые, и платить O(V+E) сканированием на каждый запрос — нерационально.
Шаблон 3: foreign keys из information_schema
В Postgres схему FK можно получить запросом:
SELECT
tc.table_name AS child,
ccu.table_name AS parent
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = 'public';
Питон-код:
import psycopg
from collections import defaultdict
fk: dict[str, list[str]] = defaultdict(list)
with psycopg.connect("dbname=app") as conn:
rows = conn.execute("""
SELECT tc.table_name AS child, ccu.table_name AS parent
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = 'public'
""").fetchall()
for child, parent in rows:
fk[parent].append(child)
# что-то полезное: какие таблицы зависят от users
print(fk.get("users", []))
Тонкие моменты:
- одна пара (parent, child) может появиться несколько раз — это разные FK с разными колонками. Уберите дубликаты через set, если важно;
- self-referencing FK даёт ребро
parent == child(петля), это валидный directed loop, но цикл длины 1 — обработайте отдельно, если в задаче не должно быть; - FK иногда указывает не на public-схему, расширяйте запрос если нужно.
Граф в networkx из любого источника
Если вы решили использовать networkx, билдинг сводится к серии add_edge:
import networkx as nx
import csv
G = nx.DiGraph()
with open("edges.csv", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
G.add_edge(row["parent"], row["child"], weight=float(row.get("weight", 1.0)))
print(G.number_of_nodes(), G.number_of_edges())
print(list(G.successors("stg_orders")))
# 7 7
# ['int_user_orders', 'int_order_items']
add_edge автоматически создаст обе вершины, если их ещё не было.
Из pandas DataFrame:
import pandas as pd
import networkx as nx
df = pd.read_csv("edges.csv")
G = nx.from_pandas_edgelist(
df,
source="parent",
target="child",
edge_attr="weight",
create_using=nx.DiGraph(),
)
Это самый короткий способ для прототипов и аналитики.
Граф в CSR через scipy
Когда граф большой, выгоднее сразу строить CSR:
import numpy as np
from scipy.sparse import coo_matrix
import csv
# 1) собрать рёбра и составить отображение имя -> int
name_to_id: dict[str, int] = {}
src_list: list[int] = []
dst_list: list[int] = []
def get_id(name: str) -> int:
if name not in name_to_id:
name_to_id[name] = len(name_to_id)
return name_to_id[name]
with open("edges.csv", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
src_list.append(get_id(row["parent"]))
dst_list.append(get_id(row["child"]))
N = len(name_to_id)
data = np.ones(len(src_list), dtype=np.int8)
# COO -> CSR
G = coo_matrix(
(data, (np.array(src_list), np.array(dst_list))),
shape=(N, N)
).tocsr()
print(f"V={N}, E={G.nnz}, memory={G.data.nbytes + G.indices.nbytes + G.indptr.nbytes} bytes")
coo_matrix принимает три параллельных массива (data, row, col) и легко конвертируется в csr через .tocsr(). Это стандартный pipeline в scipy.sparse.
От сырых данных к рабочему графу: 4 шага.
Распространённые баги в коде building graph
Баг 1: пропущенные изолированные вершины
Если строим adj через adj[u].append(v), вершина v с нулевым out-degree не появится как ключ — обращение adj.get(v) вернёт None или KeyError. Решение:
adj.setdefault(v, []) # или: defaultdict(list) с явным touch
Или в конце цикла:
for u in list(adj.keys()):
for v in adj[u]:
adj.setdefault(v, [])
Баг 2: имена не уникальны
«users» в схеме public и «users» в схеме audit — разные таблицы. Если строите граф без префикса схемы, поломаете lineage. Норма — ключ вида "schema.table" или (schema, table)-кортеж.
Баг 3: одно ребро — несколько раз
Дубликаты в CSV или несколько FK от одной таблицы к другой. Если для алгоритма важна set-семантика рёбер (BFS, DFS — без разницы; PageRank — разница), дедуплицируйте сразу.
Баг 4: цикл, который не должен быть DAG
Если данные обещают DAG, но вы получили цикл — это сразу баг в источнике. Проверка в код:
import networkx as nx
G = nx.DiGraph(...)
if not nx.is_directed_acyclic_graph(G):
print("ВНИМАНИЕ: граф не DAG, есть цикл!")
print(nx.find_cycle(G))
find_cycle найдёт первый цикл и покажет его. Полезно для отладки.
DE-кейс: лезкий граф изменений в Production DWH
Сценарий: компания хочет регулярно генерировать lineage из dbt manifest и сравнивать с предыдущей версией. Цель — заметить «новая модель появилась», «модель X теперь зависит от Y, чего не было».
Pipeline:
import json
from collections import defaultdict
def build_lineage(manifest_path: str) -> dict[str, list[str]]:
with open(manifest_path, encoding="utf-8") as f:
manifest = json.load(f)
children: dict[str, list[str]] = defaultdict(list)
for node_id, node_info in manifest["nodes"].items():
if not node_id.startswith("model."):
continue
name = node_info["name"]
for dep_id in node_info.get("depends_on", {}).get("nodes", []):
if not dep_id.startswith("model."):
continue
dep_name = manifest["nodes"][dep_id]["name"]
children[dep_name].append(name)
return dict(children)
def diff_graphs(g_old: dict, g_new: dict) -> dict[str, dict]:
diff = {"added_models": [], "removed_models": [], "added_edges": [], "removed_edges": []}
diff["added_models"] = sorted(set(g_new) - set(g_old))
diff["removed_models"] = sorted(set(g_old) - set(g_new))
for v in set(g_old) & set(g_new):
old_set = set(g_old[v])
new_set = set(g_new[v])
for u in sorted(new_set - old_set):
diff["added_edges"].append((v, u))
for u in sorted(old_set - new_set):
diff["removed_edges"].append((v, u))
return diff
g1 = build_lineage("manifest_yesterday.json")
g2 = build_lineage("manifest_today.json")
print(diff_graphs(g1, g2))
Это рабочая утилита, которую можно поставить в CI и блокировать PR с подозрительными изменениями (например, новая зависимость от deprecated-модели).
Попробуй сам
Имейте CSV-файл fk.csv:
child,parent
orders,users
order_items,orders
order_items,products
payments,orders
sessions,users
events,users
events,sessions
Задачи:
- Постройте оба словаря:
parents[child]иchildren[parent]. - Найдите вершины с in-degree = 0 (raw-сорсы) и с out-degree = 0 (терминальные).
- Найдите вершину с максимальным out-degree (центральная таблица), вершину с максимальным in-degree.
- Постройте граф в
networkxи проверьте, что он — DAG.
Ожидаемые ответы:
- Вершины с in-degree=0:
{users, products}. - Вершины с out-degree=0:
{order_items, payments, events}. - Максимальный out-degree:
users(3: orders, sessions, events). - Максимальный in-degree:
orders(1 от users) — на самом деле всего 1, а в данном графе. Проверьте по факту. - DAG: да.
В следующих модулях (12-й) мы научимся обходить эти графы — BFS, DFS, поиск циклов, topological sort. Без правильной структуры все эти алгоритмы будут медленнее, чем нужно.