Learning Platform
Глоссарий Troubleshooting
Урок 14.04 · 25 мин
Начальный
BFSshortest pathparent pointerDijkstraweighted graph

BFS как калькулятор расстояний

В уроке 01 мы упомянули: BFS даёт кратчайший путь в unweighted-графе. Сейчас разберём это детальнее, добавим восстановление самого пути (не только длины), и обсудим, почему этот метод не работает на графах с весами.

Почему BFS даёт минимум

Доказательство в одном предложении: BFS перебирает вершины в порядке возрастания расстояния от старта, поэтому первое попадание в вершину — минимально возможное.

Чуть длиннее. Допустим, BFS открыл вершину v на шаге k. Это значит, что v попала в очередь после извлечения какой-то вершины u, причём u была на расстоянии k-1. Поэтому до v есть путь длины k. И не может быть короче: если бы был путь длины k-1 или меньше, мы бы открыли v раньше — на шаге, когда обрабатывали вершину расстояния k-2.

Это только для unweighted-графа, где каждое ребро вносит вес 1. В этом случае «число рёбер» = «вес пути» = «расстояние».

Распознавание расстояний

Запишем как dict: dist[v] = расстояние от старта до v. dist одновременно играет роль visited.

from collections import deque

def shortest_distances(adj, start):
    dist = {start: 0}
    q = deque([start])
    while q:
        u = q.popleft()
        for v in adj.get(u, []):
            if v not in dist:
                dist[v] = dist[u] + 1
                q.append(v)
    return dist

adj = {
    "A": ["B", "C"],
    "B": ["D"],
    "C": ["D", "E"],
    "D": ["F"],
    "E": ["F"],
    "F": [],
}

print(shortest_distances(adj, "A"))
# {'A': 0, 'B': 1, 'C': 1, 'D': 2, 'E': 2, 'F': 3}

Никаких дополнительных структур, кроме одного dict.

Восстановление пути

Длина — это полдела. Часто нужно сам путь: «через какие модели идёт зависимость от X к Y?»

Решение: добавить parent pointer. Для каждой вершины v запомним, из какой вершины BFS пришёл в v.

def shortest_path(adj, start, target):
    if start == target:
        return [start]
    parent = {start: None}
    q = deque([start])
    while q:
        u = q.popleft()
        for v in adj.get(u, []):
            if v not in parent:
                parent[v] = u
                if v == target:
                    return _reconstruct(parent, target)
                q.append(v)
    return None  # путь не существует

def _reconstruct(parent, target):
    path = []
    cur = target
    while cur is not None:
        path.append(cur)
        cur = parent[cur]
    return list(reversed(path))

print(shortest_path(adj, "A", "F"))
# ['A', 'B', 'D', 'F']

Идея reconstruction:

  1. Начинаем с target и идём по parent-указателям.
  2. Каждый раз: добавляем текущую вершину в path, переходим к parent.
  3. Когда дошли до start (его parent = None) — стоп.
  4. Развернуть path, потому что мы шли с конца.

Сложность reconstruction — O(длина пути). На графе из миллиона вершин с путём длины 20 — это 20 операций после O(V+E) самого BFS.

Parent pointer в действии

BFS из A до F: каждая вершина запоминает откуда пришла.

parent[A]NoneСтартовая вершина не имеет родителя
parent[B]AВ B пришли из A на шаге 1
parent[C]AВ C пришли из A на шаге 1
parent[D]BВ D пришли из B на шаге 2
parent[E]CВ E пришли из C на шаге 2
parent[F]DВ F пришли из D на шаге 3, обнаружили target — стоп

Альтернативное представление пути — массив dist + восстановление через «соседа, у которого dist = current_dist - 1»:

def reconstruct_path(adj, dist, start, target):
    if target not in dist:
        return None
    path = [target]
    cur = target
    # реверс adj не нужен; ищем предка через dist
    # ВНИМАНИЕ: на directed-графе нужен обратный adj (parents instead of children)
    while cur != start:
        for u in all_predecessors(adj, cur):
            if dist.get(u, -1) == dist[cur] - 1:
                path.append(u)
                cur = u
                break
    return list(reversed(path))

Этот вариант требует знать предков (обратный граф). Parent pointer проще и не требует дополнительного хранения.

Реализация одной функцией: дистанции и пути

def bfs_full(adj, start):
    dist = {start: 0}
    parent = {start: None}
    q = deque([start])
    while q:
        u = q.popleft()
        for v in adj.get(u, []):
            if v not in dist:
                dist[v] = dist[u] + 1
                parent[v] = u
                q.append(v)
    return dist, parent

def path_to(parent, target):
    if target not in parent:
        return None
    path = []
    cur = target
    while cur is not None:
        path.append(cur)
        cur = parent[cur]
    return list(reversed(path))

dist, parent = bfs_full(adj, "A")
print(dist)               # {'A':0,'B':1,'C':1,'D':2,'E':2,'F':3}
print(path_to(parent, "F"))  # ['A', 'B', 'D', 'F']
print(path_to(parent, "E"))  # ['A', 'C', 'E']

Одна запуск BFS даёт расстояния и пути ко всем достижимым вершинам.

Когда BFS не работает: weighted graph

BFS считает каждое ребро = 1 шаг. Это норма для unweighted графов. Но что если рёбра имеют разные веса?

Пример. Граф городов с временем перелёта:

A --(10 часов)--> B --(1 час)--> C
A --(2 часа)----> D --(2 часа)--> C

BFS из A считает: B на расстоянии 1, D на расстоянии 1, C на расстоянии 2. Делает вывод «оба пути одинаковы». Это неправда:

  • A -> B -> C занимает 10 + 1 = 11 часов,
  • A -> D -> C занимает 2 + 2 = 4 часа.

Кратчайший по времени — A -> D -> C, но BFS этого не покажет, потому что не учитывает веса.

WARNING

BFS считает рёбра, а не вес. Для weighted-графа нужен Dijkstra (если веса неотрицательные) или Bellman-Ford (если могут быть отрицательные).

Краткий взгляд на Dijkstra (без полного разбора)

Идея Dijkstra: вместо обычной queue использовать priority queue (min-heap), упорядоченный по «текущему лучшему расстоянию». Из очереди извлекается вершина с минимальным dist. Алгоритм:

import heapq

def dijkstra(adj_weighted, start):
    """adj_weighted: dict[node, list[tuple[neighbor, weight]]]"""
    dist = {start: 0}
    pq = [(0, start)]            # (distance, vertex)
    while pq:
        d, u = heapq.heappop(pq)
        if d > dist.get(u, float("inf")):
            continue              # устаревший элемент
        for v, w in adj_weighted.get(u, []):
            new_dist = d + w
            if new_dist < dist.get(v, float("inf")):
                dist[v] = new_dist
                heapq.heappush(pq, (new_dist, v))
    return dist

graph = {
    "A": [("B", 10), ("D", 2)],
    "B": [("C", 1)],
    "D": [("C", 2)],
    "C": [],
}

print(dijkstra(graph, "A"))
# {'A': 0, 'B': 10, 'D': 2, 'C': 4}

C: 4 — корректный ответ. BFS на том же графе считал бы 2.

Сложность Dijkstra с binary heap — O((V + E) log V). Хуже чем O(V+E) у BFS, но единственный корректный алгоритм для positive-weight графов.

Подробно Dijkstra мы здесь не разбираем — это тема отдельной серии в продвинутом курсе. Запомните:

  • Если все веса 1 — BFS,
  • Если веса разные, но положительные — Dijkstra,
  • Если есть отрицательные веса — Bellman-Ford (или ничего, если есть отрицательный цикл).

DE-кейс: расстояние между моделями в lineage

В dbt-проекте у вас есть две модели: stg_orders и mart_user_revenue. Через сколько «уровней» зависимостей одна добирается до другой? Это unweighted graph (ребро = «зависит напрямую»), BFS подходит:

def lineage_distance(children, source, target):
    """Минимальное число hops от source до target по children-направлению."""
    if source == target:
        return 0
    dist = {source: 0}
    q = deque([source])
    while q:
        u = q.popleft()
        for v in children.get(u, []):
            if v not in dist:
                dist[v] = dist[u] + 1
                if v == target:
                    return dist[v]
                q.append(v)
    return -1  # не достижим

children = {
    "stg_orders": ["int_user_orders", "int_order_items"],
    "int_user_orders": ["mart_user_revenue"],
    "int_order_items": ["mart_user_revenue"],
    "mart_user_revenue": [],
}

print(lineage_distance(children, "stg_orders", "mart_user_revenue"))
# 2

Расстояние 2: stg_orders -> int_user_orders -> mart_user_revenue (или через int_order_items).

В реальной задаче эту метрику можно использовать для оценки «сложности изменений»: «если меняю stg_orders, на расстоянии 1 — 2 модели, на расстоянии 2 — 1 модель, итого 3 модели в зоне риска».

DE-кейс: BFS по сети событий

Допустим, у вас граф сессий: каждая сессия — вершина, ребро от sessionA к sessionB означает «sessionA породила sessionB как continuation». Граф unweighted. Вопрос: «насколько глубока цепочка сессий пользователя u?»

BFS из первой сессии пользователя по children, найти максимум dist:

def session_chain_length(children, first_session):
    dist = {first_session: 0}
    q = deque([first_session])
    max_dist = 0
    while q:
        u = q.popleft()
        for v in children.get(u, []):
            if v not in dist:
                dist[v] = dist[u] + 1
                max_dist = max(max_dist, dist[v])
                q.append(v)
    return max_dist

Простая, но рабочая метрика поведенческой аналитики.

Попробуй сам

Имеем lineage:

adj = {
    "stg_a": ["int_x", "int_y"],
    "stg_b": ["int_y"],
    "int_x": ["mart_p"],
    "int_y": ["mart_p", "mart_q"],
    "mart_p": [],
    "mart_q": [],
}

Задачи:

  1. Реализуйте all_shortest_paths(adj, source) -> dict[str, list[str]], возвращающий для каждой достижимой вершины кратчайший путь от source.
  2. Сравните пути от stg_a до mart_p: через int_x (1 hop) и через int_y (тоже 1 hop). Оба одинаковой длины. Какой выберет ваш BFS? (Подсказка: тот, через который дошёл первым; зависит от порядка соседей в adj.)
  3. Реализуйте shortest_path_count(adj, source, target) — сколько разных кратчайших путей от source до target. Например, (stg_b, mart_p) имеет 1 путь, (stg_a, mart_p) — 2 пути.

Подсказка к (3): нужна модификация BFS — вместо записи parent сохранять list of parents для каждой вершины (всех, через которых пришли с минимальным dist).

В следующем уроке — финальная техника модуля: cycle detection. Без неё все наши topological sort и DAG-проверки не работают.

Проверка знанийKnowledge check
У вас граф городов с временем переезда между ними (рёбра разной длины). Нужно найти кратчайший путь из A в B. BFS даёт ответ X. Стоит ли ему доверять?
ОтветAnswer
Нет, BFS даёт «минимальное число рёбер», а не «минимальное время». Если рёбра имеют разные веса, BFS может пропустить путь с большим числом коротких рёбер, который суммарно быстрее. Пример: путь A->C->B длиной 2+2=4 часа vs путь A->B длиной 10 часов. BFS скажет «прямой путь — 1 шаг, минимум», что по времени неверно. Для weighted-графа с positive весами нужен Dijkstra (priority queue, O((V+E)log V)). BFS корректен только если все веса одинаковы (что эквивалентно unweighted случаю).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Почему BFS даёт кратчайший путь в unweighted графе?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5