Графовые вычисления: GraphX и GraphFrames
Зачем графы в Spark?
Многие задачи в data engineering и analytics естественно описываются графами:
- Социальные сети: пользователи = вершины, дружба = рёбра, PageRank = влиятельность
- Fraud detection: транзакции между аккаунтами, поиск циклических переводов
- Knowledge graphs: сущности и связи (продукт, категория, поставщик)
- Рекомендации: граф покупок user-item, collaborative filtering
Spark предоставляет два API для графовых вычислений: GraphX (legacy, Scala-only) и GraphFrames (modern, DataFrame-based).
GraphX: RDD-based Graph API
GraphX — часть ядра Spark, доступная с версии 1.0. Работает на уровне RDD и доступна только из Scala.
Core Abstractions
import org.apache.spark.graphx._
// Вершины: (vertexId: Long, attribute: Any)
val vertices: RDD[(Long, String)] = sc.parallelize(Seq(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Carol"),
(4L, "Dave"),
(5L, "Eve")
))
// Рёбра: (srcId, dstId, attribute)
val edges: RDD[Edge[Double]] = sc.parallelize(Seq(
Edge(1L, 2L, 1.0), // Alice -> Bob
Edge(2L, 3L, 1.0), // Bob -> Carol
Edge(3L, 1L, 1.0), // Carol -> Alice
Edge(3L, 4L, 1.0), // Carol -> Dave
Edge(4L, 5L, 1.0), // Dave -> Eve
Edge(5L, 3L, 1.0) // Eve -> Carol
))
// Создание графа
val graph: Graph[String, Double] = Graph(vertices, edges)
// Triplets -- вершина-ребро-вершина тройки
graph.triplets.collect().foreach { triplet =>
println(s"${triplet.srcAttr} --[${triplet.attr}]--> ${triplet.dstAttr}")
}
// Alice --[1.0]--> Bob
// Bob --[1.0]--> Carol
// Carol --[1.0]--> Alice
// ...
Основные структуры GraphX:
| Абстракция | Описание | Тип |
|---|---|---|
VertexRDD | Вершины с атрибутами | RDD[(VertexId, VD)] |
EdgeRDD | Рёбра с атрибутами | RDD[Edge[ED]] |
Graph[VD, ED] | Граф = вершины + рёбра | Immutable, distributed |
EdgeTriplet | src + edge + dst | Для traversal и join |
Алгоритмы GraphX
PageRank
PageRank определяет влиятельность вершины по количеству и качеству входящих рёбер. Рассмотрим социальную сеть:
Социальная сеть (5 пользователей):
Alice ──────► Bob
▲ │
│ ▼
└──── Carol ◄──┘
│
▼
Dave ──► Eve
▲ │
└───────┘
// PageRank: итеративный алгоритм
val ranks = graph.pageRank(tol = 0.01).vertices
// Результат: (vertexId, pageRank)
ranks.join(vertices).sortBy(_._2._1, ascending = false)
.collect().foreach { case (id, (rank, name)) =>
println(f"$name: $rank%.4f")
}
// Carol: 0.3856 -- больше всего входящих рёбер
// Alice: 0.2145
// Bob: 0.1532
// Dave: 0.1432
// Eve: 0.1035
Carol получает наивысший PageRank: на неё ссылаются Bob, Alice (через цикл) и Eve — три входящих ребра от разных вершин.
Connected Components
Connected Components находит связные компоненты — группы вершин, достижимых друг из друга:
val cc = graph.connectedComponents().vertices
// Все 5 вершин в одной компоненте (граф связный)
// Если убрать ребро Carol->Dave -- граф разбивается на 2 компоненты:
// Component 1: {Alice, Bob, Carol}
// Component 2: {Dave, Eve}
Другие встроенные алгоритмы
// Triangle Counting -- количество треугольников для каждой вершины
val triangles = graph.triangleCount().vertices
// Полезно для обнаружения плотных сообществ
// Shortest Paths -- кратчайшие пути от каждой вершины
// до заданного набора landmarks
val shortestPaths = graph.ops
.shortestPaths(landmarks = Seq(1L, 3L))
.vertices
GraphFrames: современная альтернатива
GraphFrames — отдельная библиотека (не часть ядра Spark), которая работает поверх DataFrame API. Главные преимущества:
- Работает с PySpark, Scala, Java (не только Scala)
- Использует Catalyst optimizer для оптимизации запросов
- Поддерживает motif finding — декларативный поиск паттернов в графе
from graphframes import GraphFrame
# Вершины как DataFrame
vertices = spark.createDataFrame([
("alice", "Alice", 28),
("bob", "Bob", 32),
("carol", "Carol", 25),
("dave", "Dave", 41),
("eve", "Eve", 35)
], ["id", "name", "age"])
# Рёбра как DataFrame
edges = spark.createDataFrame([
("alice", "bob", "follows"),
("bob", "carol", "follows"),
("carol", "alice", "follows"),
("carol", "dave", "follows"),
("dave", "eve", "follows"),
("eve", "carol", "follows")
], ["src", "dst", "relationship"])
# Создание GraphFrame
g = GraphFrame(vertices, edges)
# PageRank
ranks = g.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.select("name", "pagerank") \
.orderBy("pagerank", ascending=False).show()
# Connected Components
cc = g.connectedComponents()
cc.select("name", "component").show()
Motif Finding
Motif finding — уникальная возможность GraphFrames для поиска структурных паттернов:
# Найти цепочки A -> B -> C (длина 2)
motifs = g.find("(a)-[e1]->(b); (b)-[e2]->(c)")
motifs.select("a.name", "b.name", "c.name").show()
# +-----+-----+-----+
# | name| name| name|
# +-----+-----+-----+
# |Alice| Bob|Carol|
# | Bob|Carol|Alice|
# | Bob|Carol| Dave|
# |Carol| Dave| Eve|
# | Dave| Eve|Carol|
# | Eve|Carol|Alice|
# | Eve|Carol| Dave|
# +-----+-----+-----+
# Найти взаимные связи (A -> B и B -> A)
mutual = g.find("(a)-[]->(b); (b)-[]->(a)") \
.filter("a.id < b.id") # убрать дубликаты
mutual.select("a.name", "b.name").show()
Motif-выражения описывают паттерн в синтаксисе (vertex)-[edge]->(vertex), а Spark транслирует его в оптимизированный DataFrame join.
GraphX vs GraphFrames
| Критерий | GraphX | GraphFrames |
|---|---|---|
| API уровень | RDD | DataFrame |
| Языки | Scala only | Python, Scala, Java |
| Оптимизация | Ручная | Catalyst optimizer |
| Motif finding | Нет | Да |
| Часть Spark core | Да | Отдельный пакет |
| Статус | Maintenance mode | Активная разработка |
| Подключение | Встроен | --packages graphframes:graphframes:0.8.3 |
GraphX в maintenance mode. Новые алгоритмы и оптимизации не добавляются в GraphX. Для новых проектов используйте GraphFrames. Для масштабных графовых задач (миллиарды рёбер) рассмотрите специализированные системы: Neo4j, TigerGraph, Apache Giraph.
Когда графы в Spark — правильный выбор? Когда граф — часть data pipeline, а не основной продукт. Например: вычислить PageRank пользователей как feature для ML-модели, найти connected components для дедупликации, или выявить циклические зависимости в данных. Если граф — основная структура данных приложения, используйте graph database.
Что дальше?
В следующем уроке мы разберём MLlib Pipeline API — как строить machine learning pipelines в Spark с использованием Transformer, Estimator и Pipeline.