Почему обычный visited не работает
В уроке 02 мы упомянули: для поиска циклов в directed-графе обычный binary visited (set) — недостаточен. Этот урок — про правильную технику и про то, почему она нужна.
Сначала посмотрим, что идёт не так с обычным visited:
# граф без цикла: A->B, A->C, B->C
adj = {"A": ["B", "C"], "B": ["C"], "C": []}
def dfs_naive(start, visited):
visited.add(start)
for v in adj[start]:
if v in visited:
return True # ОЛОЖНОЕ — мы думаем, что цикл
if dfs_naive(v, visited):
return True
return False
print(dfs_naive("A", set()))
Шаги:
- Заходим в A,
visited = {A}. - Из A идём в B,
visited = {A, B}. - Из B идём в C,
visited = {A, B, C}. Возврат. - Возврат в A. Идём в C — но C уже в visited.
- Алгоритм возвращает True — «цикл». Неправильно: цикла нет, это просто параллельный путь.
Корень проблемы: binary visited не различает «в текущей ветке» и «уже завершено в другой ветке». Нужно три состояния.
Three-color DFS
Трёхцветная раскраска:
- WHITE — вершина ещё не открыта,
- GRAY — вершина открыта, мы сейчас находимся в её поддереве DFS (на пути от старта до неё ещё нет возврата),
- BLACK — вершина полностью обработана (зашли и вышли, все её соседи проверены).
Правило обнаружения цикла: встреча с GRAY-вершиной = back edge = цикл. Встреча с BLACK — это просто cross edge или forward edge, не цикл.
WHITE, GRAY, BLACK = 0, 1, 2
def has_cycle(adj, vertices) -> bool:
color = {v: WHITE for v in vertices}
def dfs(u):
color[u] = GRAY
for v in adj.get(u, []):
if color[v] == GRAY:
return True # back edge — цикл!
if color[v] == WHITE and dfs(v):
return True
color[u] = BLACK
return False
for v in vertices:
if color[v] == WHITE:
if dfs(v):
return True
return False
Тестируем на нашем графе без цикла:
adj = {"A": ["B", "C"], "B": ["C"], "C": []}
print(has_cycle(adj, ["A", "B", "C"])) # False — правильно
И на графе с циклом:
adj = {"A": ["B"], "B": ["C"], "C": ["A"]}
print(has_cycle(adj, ["A", "B", "C"])) # True
Шаги:
- dfs(A). color[A] = GRAY.
- v = B. color[B] = WHITE — спускаемся. color[B] = GRAY.
- v = C. color[C] = WHITE — спускаемся. color[C] = GRAY.
- v = A. color[A] = GRAY — back edge, цикл!.
Зачем итерация по всем вершинам
Внешний for v in vertices: if color[v] == WHITE: dfs(v) — на случай, если граф несвязный. Если стартовать DFS только из одной вершины, недостижимые компоненты не обойдутся, и цикл там пропустим.
В DAG dbt это критично: модели нескольких независимых ETL могут жить рядом, но не связаны.
Итеративный вариант
Рекурсивный DFS опасен на длинных цепочках (RecursionError). Итеративный — безопасен:
WHITE, GRAY, BLACK = 0, 1, 2
def has_cycle_iter(adj, vertices) -> bool:
color = {v: WHITE for v in vertices}
for start in vertices:
if color[start] != WHITE:
continue
# имитация стека: пара (вершина, итератор по соседям)
stack = [(start, iter(adj.get(start, [])))]
color[start] = GRAY
while stack:
u, it = stack[-1]
try:
v = next(it)
except StopIteration:
color[u] = BLACK
stack.pop()
continue
if color[v] == GRAY:
return True
if color[v] == WHITE:
color[v] = GRAY
stack.append((v, iter(adj.get(v, []))))
# после while u вышли из stack — закрыли всё
return False
Главная техника — хранить в стеке не только вершину, но и итератор по её соседям, чтобы при возврате к ней не начинать обход с нуля. Это эквивалент рекурсивного варианта, но без риска переполнения.
Поиск самого цикла
Если нужно не «есть ли цикл», а сам цикл — модифицируем DFS, чтобы возвращать вершины пути при back edge:
def find_cycle(adj, vertices):
color = {v: WHITE for v in vertices}
parent = {}
def dfs(u):
color[u] = GRAY
for v in adj.get(u, []):
if color[v] == GRAY:
# восстанавливаем цикл от v к u
cycle = [v]
cur = u
while cur != v:
cycle.append(cur)
cur = parent[cur]
cycle.append(v)
return list(reversed(cycle))
if color[v] == WHITE:
parent[v] = u
result = dfs(v)
if result:
return result
color[u] = BLACK
return None
for v in vertices:
if color[v] == WHITE:
result = dfs(v)
if result:
return result
return None
adj = {"A": ["B"], "B": ["C"], "C": ["A"]}
print(find_cycle(adj, ["A", "B", "C"]))
# ['A', 'B', 'C', 'A']
Полезно для отладки: показать пользователю конкретный «вот этот цикл» вместо «где-то в графе цикл».
Cycle detection в undirected графе
Для undirected графа цикл обнаруживается проще: достаточно DFS с visited и проверкой, что сосед, в которого собираемся пойти, не является родителем текущей вершины.
def has_cycle_undirected(adj, vertices):
visited = set()
def dfs(u, parent):
visited.add(u)
for v in adj.get(u, []):
if v == parent:
continue
if v in visited or dfs(v, u):
return True
return False
for v in vertices:
if v not in visited:
if dfs(v, None):
return True
return False
Здесь идея: в неориентированном графе ребро (u, v) даёт две стрелки u->v и v->u в adj. Когда мы заходим в v из u, мы не должны считать «v->u» циклом — это просто обратный ход того же ребра. Проверка v == parent спасает.
В directed-графе такой трюк не работает: «петля» через несколько вершин не имеет «parent» в одном шаге.
Альтернатива: Kahn’s algorithm
Есть другой способ — через постепенное удаление вершин с in-degree=0:
from collections import defaultdict, deque
def kahn_topological_sort(adj, vertices):
in_degree = {v: 0 for v in vertices}
for u in adj:
for v in adj[u]:
in_degree[v] = in_degree.get(v, 0) + 1
# старт — все с in_degree=0
q = deque([v for v in vertices if in_degree[v] == 0])
order = []
while q:
u = q.popleft()
order.append(u)
for v in adj.get(u, []):
in_degree[v] -= 1
if in_degree[v] == 0:
q.append(v)
if len(order) != len(vertices):
return None # цикл: некоторые вершины не получили in_degree=0
return order
print(kahn_topological_sort({"A":["B"], "B":["C"], "C":[]}, ["A","B","C"]))
# ['A', 'B', 'C']
print(kahn_topological_sort({"A":["B"], "B":["C"], "C":["A"]}, ["A","B","C"]))
# None — цикл
Алгоритм Кана даёт сразу две вещи: и обнаружение цикла, и topological sort (если цикла нет). Сложность O(V+E). Используется во многих практических планировщиках.
Различие с DFS-методом: Kahn — BFS-like, постепенное удаление. DFS — глубинный, через раскраску. Оба корректны, выбирайте по контексту:
- Kahn проще объясняется, ясно показывает источник цикла (вершины с in-
degree > 0после прохода — внутри цикла), - DFS позволяет легко найти конкретный цикл (через parent pointer).
DE-кейс 1: проверка dbt manifest
Когда dbt компилирует проект и обнаруживает циклическую зависимость, он выводит сообщение и выходит. Псевдокод реализации внутри dbt — это либо DFS с трёхцветной, либо Kahn:
def check_dbt_dag(manifest_path):
with open(manifest_path) as f:
manifest = json.load(f)
adj = defaultdict(list)
vertices = set()
for nid, info in manifest["nodes"].items():
if not nid.startswith("model."):
continue
name = info["name"]
vertices.add(name)
for dep_id in info.get("depends_on", {}).get("nodes", []):
if dep_id.startswith("model."):
dep_name = manifest["nodes"][dep_id]["name"]
adj[dep_name].append(name)
vertices.add(dep_name)
cycle = find_cycle(adj, list(vertices))
if cycle:
print(f"ERROR: cycle in dbt project: {' -> '.join(cycle)}")
sys.exit(1)
print(f"OK: {len(vertices)} models, no cycles")
DE-кейс 2: dependency resolution в task scheduler
Допустим, вы пишете свой mini-Airflow:
def schedule(tasks: dict[str, list[str]]):
"""tasks: {task_name: [dependencies]}. Возвращает порядок исполнения или ошибку."""
adj = defaultdict(list)
vertices = set(tasks.keys())
for task, deps in tasks.items():
for dep in deps:
adj[dep].append(task)
vertices.add(dep)
order = kahn_topological_sort(adj, list(vertices))
if order is None:
raise ValueError("Cyclic dependency in tasks")
return order
tasks = {
"extract_raw": [],
"stage": ["extract_raw"],
"transform": ["stage"],
"load_dwh": ["transform"],
"build_marts": ["load_dwh"],
"send_report": ["build_marts"],
}
print(schedule(tasks))
# ['extract_raw', 'stage', 'transform', 'load_dwh', 'build_marts', 'send_report']
Это работающее ядро scheduler-а. Все остальные слои (retry, parallel, time-based) — обвязка вокруг topological sort.
Self-referencing FK (петли)
В Postgres допустимы FK типа parent_id REFERENCES this_table(id) — самоссылка. Например, дерево комментариев: comments.parent_id -> comments.id.
С точки зрения графа это ребро comments -> comments — петля (loop). Это цикл длины 1. Если ваш алгоритм проверяет «is_dag», такой граф будет считаться cyclic, хотя для приложения он валидный.
Решение — отдельно обрабатывать self-loop или фильтровать на этапе сбора рёбер. Это рутинная защита в lineage-сервисах.
Сложность
DFS-cycle-detection: O(V + E). Каждую вершину красим раз, каждое ребро рассматриваем раз.
Kahn: O(V + E). Считаем in_degree (E операций), потом V извлечений из queue, итого V + E.
Оба линейные. На лесу из миллиона моделей это миллисекунды.
Попробуй сам
Граф моделей с подозрением на цикл:
adj = {
"a": ["b", "c"],
"b": ["d"],
"c": ["d", "e"],
"d": ["f"],
"e": ["f"],
"f": ["g"],
"g": ["b"], # вот цикл: g -> b -> d -> f -> g
}
Задачи:
- Реализуйте
has_cycle(adj, vertices)через DFS с three-color. - Реализуйте
find_cycle(adj, vertices), возвращающую сам цикл. - Удалите ребро
g -> bи проверьте, чтоhas_cycleтеперь возвращает False. - Реализуйте
kahn_topological_sort(adj, vertices)на исправленном графе.
Ожидаемые ответы:
has_cycleс ребромg -> b= True,find_cycleвозвращает[b, d, f, g, b]или эквивалент,- После удаления
g -> bцикла нет, - topological order:
[a, c, e, b, d, f, g](или другой валидный).
В следующем модуле мы переходим от графов к сортировкам — фундаментальной операции, без которой не строятся индексы, не работает merge join и не реализуется большинство SQL-запросов.
Declarative partitioning в PostgreSQL