Prerequisites:
- 04-subsquid-erc20-indexer
Subsquid: мульти-событийная индексация
От одного события к десяткам
В INDEX-04 мы индексировали один тип событий — ERC-20 Transfer. Но реальные dApps работают с десятками типов событий. Uniswap V2 пара генерирует:
- Swap — обмены токенов
- Sync — обновление резервов пула
- Mint / Burn — добавление / удаление ликвидности
Как индексировать ВСЁ это ОДНИМ процессором?
В Module 5 мы изучали Uniswap V2 и формулу
xy = k. Теперь мы построим индексатор, который будет отслеживать ВСЕ операции Uniswap пары — каждый обмен, каждое изменение резервов, каждый Transfer токенов. Один процессор, одна база, один GraphQL API.
Расширение схемы
В INDEX-04 у нас были два entity: Transfer и Account. Теперь добавим Swap и PoolState:
# Расширенная схема для мульти-событийной индексации
type Transfer @entity {
id: ID!
from: String! @index
to: String! @index
value: BigInt!
timestamp: DateTime!
blockNumber: Int!
txHash: String! @index
}
type Swap @entity {
id: ID!
sender: String! @index
to: String! @index
amount0In: BigInt!
amount1In: BigInt!
amount0Out: BigInt!
amount1Out: BigInt!
timestamp: DateTime!
blockNumber: Int!
txHash: String! @index
}
type PoolState @entity {
id: ID! # Pool address
reserve0: BigInt!
reserve1: BigInt!
totalSwaps: Int!
lastUpdated: DateTime!
}
type Account @entity {
id: ID!
balance: BigInt!
}
Каждый тип события отображается на свой entity type. PoolState — это агрегация: entity обновляется при каждом Sync событии. totalSwaps — running counter.
Принцип: Transfer и Swap — immutable записи (факт произошёл — запись неизменна). PoolState — mutable состояние (обновляется при каждом Sync). Разные типы данных — разные паттерны persistence (
INSERTvsUPSERT).
Мульти-событийный процессор
.addLog({ topic0: [SWAP] })
.addLog({ topic0: [SYNC] })
Ключевая идея: один EvmBatchProcessor может подписаться на несколько фильтров одновременно. Каждый .addLog() добавляет отдельный фильтр:
// Uniswap V2 Swap event topic0
// keccak256('Swap(address,uint256,uint256,uint256,uint256,address)')
export const SWAP_TOPIC =
'0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
// Uniswap V2 Sync event topic0
// keccak256('Sync(uint112,uint112)')
export const SYNC_TOPIC =
'0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1'
export const processor = new EvmBatchProcessor()
.setRpcEndpoint('http://anvil:8545')
.setFinalityConfirmation(1)
// Фильтр 1: ERC-20 Transfer события
.addLog({
address: [TOKEN_ADDRESS],
topic0: [TRANSFER_TOPIC],
})
// Фильтр 2: Uniswap Swap события
.addLog({
address: [PAIR_ADDRESS],
topic0: [SWAP_TOPIC],
})
// Фильтр 3: Uniswap Sync события
.addLog({
address: [PAIR_ADDRESS],
topic0: [SYNC_TOPIC],
})
.setFields({
log: { transactionHash: true, topics: true, data: true },
})
Один процессор, три фильтра. Processor получает ВСЕ подходящие логи в одном батче. В handler мы проверяем topic0, чтобы определить тип события.
Handler для нескольких событий
Главный паттерн — switch по topic0 для маршрутизации событий:
processor.run(db, async (ctx) => {
const transfers: Transfer[] = []
const swaps: Swap[] = []
for (const block of ctx.blocks) {
for (const log of block.logs) {
switch (log.topics[0]) {
case TRANSFER_TOPIC:
if (log.address === TOKEN_ADDRESS) {
transfers.push(decodeTransfer(log, block))
}
break
case SWAP_TOPIC:
if (log.address === PAIR_ADDRESS) {
swaps.push(decodeSwap(log, block))
}
break
case SYNC_TOPIC:
if (log.address === PAIR_ADDRESS) {
await updatePoolState(ctx.store, log, block)
}
break
}
}
}
await ctx.store.insert(transfers)
await ctx.store.insert(swaps)
})
Три паттерна persistence в одном handler:
Transfer— batchINSERT(собираем массив, вставляем разом)Swap— batchINSERT(аналогично)PoolState— immediateUPSERT(обновляем текущее состояние сразу)
Почему PoolState не батчим? Каждое Sync событие обновляет ОДНУ и ту же запись (один пул). Если в одном батче два Sync события, нужно применить оба последовательно. Batch insert не поддерживает update одной записи дважды.
Декодирование Swap события
Uniswap V2 Swap event имеет следующую сигнатуру:
event Swap(
address indexed sender, // topic1
uint amount0In, // data[0:32]
uint amount1In, // data[32:64]
uint amount0Out, // data[64:96]
uint amount1Out, // data[96:128]
address indexed to // topic2
);
Декодируем вручную:
function decodeSwap(log: Log, block: Block): Swap {
// indexed параметры в topics
const sender = '0x' + log.topics[1].slice(26)
const to = '0x' + log.topics[2].slice(26)
// Non-indexed параметры в data (каждый по 32 байта = 64 hex)
const data = log.data.slice(2) // Убираем 0x prefix
const amount0In = BigInt('0x' + data.slice(0, 64))
const amount1In = BigInt('0x' + data.slice(64, 128))
const amount0Out = BigInt('0x' + data.slice(128, 192))
const amount1Out = BigInt('0x' + data.slice(192, 256))
return new Swap({
id: log.id,
sender,
to,
amount0In,
amount1In,
amount0Out,
amount1Out,
timestamp: new Date(block.header.timestamp),
blockNumber: block.header.height,
txHash: log.transactionHash,
})
}
Каждый параметр — 32 байта (64 hex символа) в data. indexed параметры идут в topics. Non-indexed — последовательно в data.
Интуитивно: Представьте посылку с 4 вложениями одинакового размера. Вы знаете, что каждое вложение — ровно 64 символа. Просто отсчитываете: 0-64, 64-128, 128-192, 192-256.
Агрегации через PoolState
Sync событие обновляет резервы пула. Мы используем UPSERT pattern — создаём запись при первом Sync, обновляем при каждом следующем:
async function updatePoolState(store: Store, log: Log, block: Block) {
const data = log.data.slice(2)
const reserve0 = BigInt('0x' + data.slice(0, 64))
const reserve1 = BigInt('0x' + data.slice(64, 128))
let pool = await store.get(PoolState, PAIR_ADDRESS)
if (!pool) {
pool = new PoolState({
id: PAIR_ADDRESS,
reserve0: 0n,
reserve1: 0n,
totalSwaps: 0,
lastUpdated: new Date(),
})
}
pool.reserve0 = reserve0
pool.reserve1 = reserve1
pool.totalSwaps += 1
pool.lastUpdated = new Date(block.header.timestamp)
await store.upsert(pool)
}
PoolState.totalSwaps — running counter, инкрементируется при каждом Sync. reserve0 и reserve1 перезаписываются актуальными значениями. GraphQL запрос вернёт текущее состояние пула.
GraphQL запросы для мульти-данных
Теперь один GraphQL API отдаёт все три типа данных:
Последние 5 свапов
query {
swaps(orderBy: blockNumber_DESC, limit: 5) {
sender
amount0In
amount1Out
blockNumber
}
}
Текущее состояние пула
query {
poolStateById(id: "0x...") {
reserve0
reserve1
totalSwaps
lastUpdated
}
}
Комбинированный запрос (три entity в одном)
query {
transfers(limit: 5, orderBy: blockNumber_DESC) {
from
to
value
}
swaps(limit: 5, orderBy: blockNumber_DESC) {
sender
amount0Out
amount1Out
}
poolStateById(id: "0x...") {
reserve0
reserve1
totalSwaps
}
}
Одним запросом — последние трансферы, свапы и текущие резервы пула. REST API потребовал бы 3 отдельных endpoint’а.
Алгоритмический уровень
Event routing как pattern matching:
EventRouter:
INPUT: log (from blockchain batch)
MATCH log.topic0:
TRANSFER_TOPIC -> decodeTransfer(log) -> batch[] -> INSERT Transfer entity
SWAP_TOPIC -> decodeSwap(log) -> batch[] -> INSERT Swap entity
SYNC_TOPIC -> decodeSync(log) -> UPSERT PoolState entity
_ -> skip (unrecognized event)
END MATCH
FLUSH:
INSERT ALL Transfer[] (one SQL statement)
INSERT ALL Swap[] (one SQL statement)
// PoolState already upserted inline
Сложность:
- Маршрутизация:
O(1)per event (topic0 lookup) - Batch insert:
O(N)один SQL для N записей - PoolState upsert:
O(1)per Sync event
Итоги
| Концепция | Описание |
|---|---|
| Мульти-фильтр | Несколько .addLog() на одном процессоре |
| Topic0 routing | switch по log.topics[0] для маршрутизации |
| Batch INSERT | Immutable events (Transfer, Swap) собираются в массив |
| UPSERT | Mutable state (PoolState) обновляется на месте |
| Агрегации | Running counters и текущие значения в отдельном entity |
| Комбинированный GraphQL | Несколько entity types в одном запросе |
Что дальше: В INDEX-06 мы рассмотрим продвинутые паттерны Subsquid: индексацию governance (stateful entity с жизненным циклом), WebSocket подписки и multi-chain конфигурацию.
Finished the lesson?
Mark it as complete to track your progress