BFS как калькулятор расстояний
В уроке 01 мы упомянули: BFS даёт кратчайший путь в unweighted-графе. Сейчас разберём это детальнее, добавим восстановление самого пути (не только длины), и обсудим, почему этот метод не работает на графах с весами.
Почему BFS даёт минимум
Доказательство в одном предложении: BFS перебирает вершины в порядке возрастания расстояния от старта, поэтому первое попадание в вершину — минимально возможное.
Чуть длиннее. Допустим, BFS открыл вершину v на шаге k. Это значит, что v попала в очередь после извлечения какой-то вершины u, причём u была на расстоянии k-1. Поэтому до v есть путь длины k. И не может быть короче: если бы был путь длины k-1 или меньше, мы бы открыли v раньше — на шаге, когда обрабатывали вершину расстояния k-2.
Это только для unweighted-графа, где каждое ребро вносит вес 1. В этом случае «число рёбер» = «вес пути» = «расстояние».
Распознавание расстояний
Запишем как dict: dist[v] = расстояние от старта до v. dist одновременно играет роль visited.
from collections import deque
def shortest_distances(adj, start):
dist = {start: 0}
q = deque([start])
while q:
u = q.popleft()
for v in adj.get(u, []):
if v not in dist:
dist[v] = dist[u] + 1
q.append(v)
return dist
adj = {
"A": ["B", "C"],
"B": ["D"],
"C": ["D", "E"],
"D": ["F"],
"E": ["F"],
"F": [],
}
print(shortest_distances(adj, "A"))
# {'A': 0, 'B': 1, 'C': 1, 'D': 2, 'E': 2, 'F': 3}
Никаких дополнительных структур, кроме одного dict.
Восстановление пути
Длина — это полдела. Часто нужно сам путь: «через какие модели идёт зависимость от X к Y?»
Решение: добавить parent pointer. Для каждой вершины v запомним, из какой вершины BFS пришёл в v.
def shortest_path(adj, start, target):
if start == target:
return [start]
parent = {start: None}
q = deque([start])
while q:
u = q.popleft()
for v in adj.get(u, []):
if v not in parent:
parent[v] = u
if v == target:
return _reconstruct(parent, target)
q.append(v)
return None # путь не существует
def _reconstruct(parent, target):
path = []
cur = target
while cur is not None:
path.append(cur)
cur = parent[cur]
return list(reversed(path))
print(shortest_path(adj, "A", "F"))
# ['A', 'B', 'D', 'F']
Идея reconstruction:
- Начинаем с target и идём по parent-указателям.
- Каждый раз: добавляем текущую вершину в path, переходим к parent.
- Когда дошли до start (его parent = None) — стоп.
- Развернуть path, потому что мы шли с конца.
Сложность reconstruction — O(длина пути). На графе из миллиона вершин с путём длины 20 — это 20 операций после O(V+E) самого BFS.
BFS из A до F: каждая вершина запоминает откуда пришла.
Альтернативное представление пути — массив dist + восстановление через «соседа, у которого dist = current_dist - 1»:
def reconstruct_path(adj, dist, start, target):
if target not in dist:
return None
path = [target]
cur = target
# реверс adj не нужен; ищем предка через dist
# ВНИМАНИЕ: на directed-графе нужен обратный adj (parents instead of children)
while cur != start:
for u in all_predecessors(adj, cur):
if dist.get(u, -1) == dist[cur] - 1:
path.append(u)
cur = u
break
return list(reversed(path))
Этот вариант требует знать предков (обратный граф). Parent pointer проще и не требует дополнительного хранения.
Реализация одной функцией: дистанции и пути
def bfs_full(adj, start):
dist = {start: 0}
parent = {start: None}
q = deque([start])
while q:
u = q.popleft()
for v in adj.get(u, []):
if v not in dist:
dist[v] = dist[u] + 1
parent[v] = u
q.append(v)
return dist, parent
def path_to(parent, target):
if target not in parent:
return None
path = []
cur = target
while cur is not None:
path.append(cur)
cur = parent[cur]
return list(reversed(path))
dist, parent = bfs_full(adj, "A")
print(dist) # {'A':0,'B':1,'C':1,'D':2,'E':2,'F':3}
print(path_to(parent, "F")) # ['A', 'B', 'D', 'F']
print(path_to(parent, "E")) # ['A', 'C', 'E']
Одна запуск BFS даёт расстояния и пути ко всем достижимым вершинам.
Когда BFS не работает: weighted graph
BFS считает каждое ребро = 1 шаг. Это норма для unweighted графов. Но что если рёбра имеют разные веса?
Пример. Граф городов с временем перелёта:
A --(10 часов)--> B --(1 час)--> C
A --(2 часа)----> D --(2 часа)--> C
BFS из A считает: B на расстоянии 1, D на расстоянии 1, C на расстоянии 2. Делает вывод «оба пути одинаковы». Это неправда:
- A -> B -> C занимает 10 + 1 = 11 часов,
- A -> D -> C занимает 2 + 2 = 4 часа.
Кратчайший по времени — A -> D -> C, но BFS этого не покажет, потому что не учитывает веса.
BFS считает рёбра, а не вес. Для weighted-графа нужен Dijkstra (если веса неотрицательные) или Bellman-Ford (если могут быть отрицательные).
Краткий взгляд на Dijkstra (без полного разбора)
Идея Dijkstra: вместо обычной queue использовать priority queue (min-heap), упорядоченный по «текущему лучшему расстоянию». Из очереди извлекается вершина с минимальным dist. Алгоритм:
import heapq
def dijkstra(adj_weighted, start):
"""adj_weighted: dict[node, list[tuple[neighbor, weight]]]"""
dist = {start: 0}
pq = [(0, start)] # (distance, vertex)
while pq:
d, u = heapq.heappop(pq)
if d > dist.get(u, float("inf")):
continue # устаревший элемент
for v, w in adj_weighted.get(u, []):
new_dist = d + w
if new_dist < dist.get(v, float("inf")):
dist[v] = new_dist
heapq.heappush(pq, (new_dist, v))
return dist
graph = {
"A": [("B", 10), ("D", 2)],
"B": [("C", 1)],
"D": [("C", 2)],
"C": [],
}
print(dijkstra(graph, "A"))
# {'A': 0, 'B': 10, 'D': 2, 'C': 4}
C: 4 — корректный ответ. BFS на том же графе считал бы 2.
Сложность Dijkstra с binary heap — O((V + E) log V). Хуже чем O(V+E) у BFS, но единственный корректный алгоритм для positive-weight графов.
Подробно Dijkstra мы здесь не разбираем — это тема отдельной серии в продвинутом курсе. Запомните:
- Если все веса 1 — BFS,
- Если веса разные, но положительные — Dijkstra,
- Если есть отрицательные веса — Bellman-Ford (или ничего, если есть отрицательный цикл).
DE-кейс: расстояние между моделями в lineage
В dbt-проекте у вас есть две модели: stg_orders и mart_user_revenue. Через сколько «уровней» зависимостей одна добирается до другой? Это unweighted graph (ребро = «зависит напрямую»), BFS подходит:
def lineage_distance(children, source, target):
"""Минимальное число hops от source до target по children-направлению."""
if source == target:
return 0
dist = {source: 0}
q = deque([source])
while q:
u = q.popleft()
for v in children.get(u, []):
if v not in dist:
dist[v] = dist[u] + 1
if v == target:
return dist[v]
q.append(v)
return -1 # не достижим
children = {
"stg_orders": ["int_user_orders", "int_order_items"],
"int_user_orders": ["mart_user_revenue"],
"int_order_items": ["mart_user_revenue"],
"mart_user_revenue": [],
}
print(lineage_distance(children, "stg_orders", "mart_user_revenue"))
# 2
Расстояние 2: stg_orders -> int_user_orders -> mart_user_revenue (или через int_order_items).
В реальной задаче эту метрику можно использовать для оценки «сложности изменений»: «если меняю stg_orders, на расстоянии 1 — 2 модели, на расстоянии 2 — 1 модель, итого 3 модели в зоне риска».
DE-кейс: BFS по сети событий
Допустим, у вас граф сессий: каждая сессия — вершина, ребро от sessionA к sessionB означает «sessionA породила sessionB как continuation». Граф unweighted. Вопрос: «насколько глубока цепочка сессий пользователя u?»
BFS из первой сессии пользователя по children, найти максимум dist:
def session_chain_length(children, first_session):
dist = {first_session: 0}
q = deque([first_session])
max_dist = 0
while q:
u = q.popleft()
for v in children.get(u, []):
if v not in dist:
dist[v] = dist[u] + 1
max_dist = max(max_dist, dist[v])
q.append(v)
return max_dist
Простая, но рабочая метрика поведенческой аналитики.
Попробуй сам
Имеем lineage:
adj = {
"stg_a": ["int_x", "int_y"],
"stg_b": ["int_y"],
"int_x": ["mart_p"],
"int_y": ["mart_p", "mart_q"],
"mart_p": [],
"mart_q": [],
}
Задачи:
- Реализуйте
all_shortest_paths(adj, source) -> dict[str, list[str]], возвращающий для каждой достижимой вершины кратчайший путь от source. - Сравните пути от
stg_aдоmart_p: черезint_x(1 hop) и черезint_y(тоже 1 hop). Оба одинаковой длины. Какой выберет ваш BFS? (Подсказка: тот, через который дошёл первым; зависит от порядка соседей в adj.) - Реализуйте
shortest_path_count(adj, source, target)— сколько разных кратчайших путей от source до target. Например,(stg_b, mart_p)имеет 1 путь,(stg_a, mart_p)— 2 пути.
Подсказка к (3): нужна модификация BFS — вместо записи parent сохранять list of parents для каждой вершины (всех, через которых пришли с минимальным dist).
В следующем уроке — финальная техника модуля: cycle detection. Без неё все наши topological sort и DAG-проверки не работают.