Learning Platform
Глоссарий Troubleshooting
Урок 12.01 · 14 мин
Средний
GraphXGraphFramesPageRankConnected ComponentsGraph Processing

Графовые вычисления: GraphX и GraphFrames

Зачем графы в Spark?

Многие задачи в data engineering и analytics естественно описываются графами:

  • Социальные сети: пользователи = вершины, дружба = рёбра, PageRank = влиятельность
  • Fraud detection: транзакции между аккаунтами, поиск циклических переводов
  • Knowledge graphs: сущности и связи (продукт, категория, поставщик)
  • Рекомендации: граф покупок user-item, collaborative filtering

Spark предоставляет два API для графовых вычислений: GraphX (legacy, Scala-only) и GraphFrames (modern, DataFrame-based).

GraphX: RDD-based Graph API

GraphX — часть ядра Spark, доступная с версии 1.0. Работает на уровне RDD и доступна только из Scala.

Core Abstractions

import org.apache.spark.graphx._

// Вершины: (vertexId: Long, attribute: Any)
val vertices: RDD[(Long, String)] = sc.parallelize(Seq(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Carol"),
  (4L, "Dave"),
  (5L, "Eve")
))

// Рёбра: (srcId, dstId, attribute)
val edges: RDD[Edge[Double]] = sc.parallelize(Seq(
  Edge(1L, 2L, 1.0),  // Alice -> Bob
  Edge(2L, 3L, 1.0),  // Bob -> Carol
  Edge(3L, 1L, 1.0),  // Carol -> Alice
  Edge(3L, 4L, 1.0),  // Carol -> Dave
  Edge(4L, 5L, 1.0),  // Dave -> Eve
  Edge(5L, 3L, 1.0)   // Eve -> Carol
))

// Создание графа
val graph: Graph[String, Double] = Graph(vertices, edges)

// Triplets -- вершина-ребро-вершина тройки
graph.triplets.collect().foreach { triplet =>
  println(s"${triplet.srcAttr} --[${triplet.attr}]--> ${triplet.dstAttr}")
}
// Alice --[1.0]--> Bob
// Bob --[1.0]--> Carol
// Carol --[1.0]--> Alice
// ...

Основные структуры GraphX:

АбстракцияОписаниеТип
VertexRDDВершины с атрибутамиRDD[(VertexId, VD)]
EdgeRDDРёбра с атрибутамиRDD[Edge[ED]]
Graph[VD, ED]Граф = вершины + рёбраImmutable, distributed
EdgeTripletsrc + edge + dstДля traversal и join

Алгоритмы GraphX

PageRank

PageRank определяет влиятельность вершины по количеству и качеству входящих рёбер. Рассмотрим социальную сеть:

Социальная сеть (5 пользователей):

  Alice ──────► Bob
    ▲             │
    │             ▼
    └──── Carol ◄──┘


          Dave ──► Eve
            ▲       │
            └───────┘
// PageRank: итеративный алгоритм
val ranks = graph.pageRank(tol = 0.01).vertices

// Результат: (vertexId, pageRank)
ranks.join(vertices).sortBy(_._2._1, ascending = false)
  .collect().foreach { case (id, (rank, name)) =>
    println(f"$name: $rank%.4f")
  }
// Carol: 0.3856  -- больше всего входящих рёбер
// Alice: 0.2145
// Bob:   0.1532
// Dave:  0.1432
// Eve:   0.1035

Carol получает наивысший PageRank: на неё ссылаются Bob, Alice (через цикл) и Eve — три входящих ребра от разных вершин.

Connected Components

Connected Components находит связные компоненты — группы вершин, достижимых друг из друга:

val cc = graph.connectedComponents().vertices
// Все 5 вершин в одной компоненте (граф связный)

// Если убрать ребро Carol->Dave -- граф разбивается на 2 компоненты:
// Component 1: {Alice, Bob, Carol}
// Component 2: {Dave, Eve}

Другие встроенные алгоритмы

// Triangle Counting -- количество треугольников для каждой вершины
val triangles = graph.triangleCount().vertices
// Полезно для обнаружения плотных сообществ

// Shortest Paths -- кратчайшие пути от каждой вершины
// до заданного набора landmarks
val shortestPaths = graph.ops
  .shortestPaths(landmarks = Seq(1L, 3L))
  .vertices

GraphFrames: современная альтернатива

GraphFrames — отдельная библиотека (не часть ядра Spark), которая работает поверх DataFrame API. Главные преимущества:

  • Работает с PySpark, Scala, Java (не только Scala)
  • Использует Catalyst optimizer для оптимизации запросов
  • Поддерживает motif finding — декларативный поиск паттернов в графе
from graphframes import GraphFrame

# Вершины как DataFrame
vertices = spark.createDataFrame([
    ("alice", "Alice", 28),
    ("bob", "Bob", 32),
    ("carol", "Carol", 25),
    ("dave", "Dave", 41),
    ("eve", "Eve", 35)
], ["id", "name", "age"])

# Рёбра как DataFrame
edges = spark.createDataFrame([
    ("alice", "bob", "follows"),
    ("bob", "carol", "follows"),
    ("carol", "alice", "follows"),
    ("carol", "dave", "follows"),
    ("dave", "eve", "follows"),
    ("eve", "carol", "follows")
], ["src", "dst", "relationship"])

# Создание GraphFrame
g = GraphFrame(vertices, edges)

# PageRank
ranks = g.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.select("name", "pagerank") \
    .orderBy("pagerank", ascending=False).show()

# Connected Components
cc = g.connectedComponents()
cc.select("name", "component").show()

Motif Finding

Motif finding — уникальная возможность GraphFrames для поиска структурных паттернов:

# Найти цепочки A -> B -> C (длина 2)
motifs = g.find("(a)-[e1]->(b); (b)-[e2]->(c)")
motifs.select("a.name", "b.name", "c.name").show()
# +-----+-----+-----+
# | name| name| name|
# +-----+-----+-----+
# |Alice|  Bob|Carol|
# |  Bob|Carol|Alice|
# |  Bob|Carol| Dave|
# |Carol| Dave|  Eve|
# | Dave|  Eve|Carol|
# |  Eve|Carol|Alice|
# |  Eve|Carol| Dave|
# +-----+-----+-----+

# Найти взаимные связи (A -> B и B -> A)
mutual = g.find("(a)-[]->(b); (b)-[]->(a)") \
    .filter("a.id < b.id")  # убрать дубликаты
mutual.select("a.name", "b.name").show()

Motif-выражения описывают паттерн в синтаксисе (vertex)-[edge]->(vertex), а Spark транслирует его в оптимизированный DataFrame join.

GraphX vs GraphFrames

КритерийGraphXGraphFrames
API уровеньRDDDataFrame
ЯзыкиScala onlyPython, Scala, Java
ОптимизацияРучнаяCatalyst optimizer
Motif findingНетДа
Часть Spark coreДаОтдельный пакет
СтатусMaintenance modeАктивная разработка
ПодключениеВстроен--packages graphframes:graphframes:0.8.3
WARNING

GraphX в maintenance mode. Новые алгоритмы и оптимизации не добавляются в GraphX. Для новых проектов используйте GraphFrames. Для масштабных графовых задач (миллиарды рёбер) рассмотрите специализированные системы: Neo4j, TigerGraph, Apache Giraph.

TIP

Когда графы в Spark — правильный выбор? Когда граф — часть data pipeline, а не основной продукт. Например: вычислить PageRank пользователей как feature для ML-модели, найти connected components для дедупликации, или выявить циклические зависимости в данных. Если граф — основная структура данных приложения, используйте graph database.

Проверка знанийKnowledge check
В чём ключевые отличия GraphFrames от GraphX? Когда следует выбрать GraphFrames?
ОтветAnswer
Ключевые отличия: (1) GraphFrames работает с DataFrame API (не RDD), что даёт Catalyst optimization; (2) GraphFrames поддерживает Python, Scala и Java (GraphX -- только Scala); (3) GraphFrames имеет motif finding -- декларативный поиск паттернов; (4) GraphX в maintenance mode, GraphFrames активно развивается. Выбирайте GraphFrames для всех новых проектов, особенно если работаете с PySpark.
Проверка знанийKnowledge check
Что такое PageRank и как он определяет влиятельность вершины в графе?
ОтветAnswer
PageRank -- итеративный алгоритм, который определяет влиятельность (важность) вершины по количеству и качеству входящих рёбер. Вершина получает высокий ранг, если на неё ссылаются другие вершины с высоким рангом. Алгоритм итеративно пересчитывает ранги, пока не сойдётся (разница между итерациями < tolerance). В нашем примере Carol получила наивысший PageRank, потому что на неё ссылались 3 вершины (Bob, Alice через цикл, Eve).

Что дальше?

В следующем уроке мы разберём MLlib Pipeline API — как строить machine learning pipelines в Spark с использованием Transformer, Estimator и Pipeline.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие основные абстракции использует GraphX для представления графа?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3