Требуемые знания:
- 04-subsquid-erc20-indexer
Subsquid: мульти-событийная индексация
От одного события к десяткам
В INDEX-04 мы индексировали один тип событий — ERC-20 Transfer. Но реальные dApps работают с десятками типов событий. Uniswap V2 пара генерирует:
- Swap — обмены токенов
- Sync — обновление резервов пула
- Mint / Burn — добавление / удаление ликвидности
Как индексировать ВСЁ это ОДНИМ процессором?
В Module 5 мы изучали Uniswap V2 и формулу
xy = k. Теперь мы построим индексатор, который будет отслеживать ВСЕ операции Uniswap пары — каждый обмен, каждое изменение резервов, каждый Transfer токенов. Один процессор, одна база, один GraphQL API.
Расширение схемы
В INDEX-04 у нас были два entity: Transfer и Account. Теперь добавим Swap и PoolState:
# Расширенная схема для мульти-событийной индексации
type Transfer @entity {
id: ID!
from: String! @index
to: String! @index
value: BigInt!
timestamp: DateTime!
blockNumber: Int!
txHash: String! @index
}
type Swap @entity {
id: ID!
sender: String! @index
to: String! @index
amount0In: BigInt!
amount1In: BigInt!
amount0Out: BigInt!
amount1Out: BigInt!
timestamp: DateTime!
blockNumber: Int!
txHash: String! @index
}
type PoolState @entity {
id: ID! # Pool address
reserve0: BigInt!
reserve1: BigInt!
totalSwaps: Int!
lastUpdated: DateTime!
}
type Account @entity {
id: ID!
balance: BigInt!
}
Каждый тип события отображается на свой entity type. PoolState — это агрегация: entity обновляется при каждом Sync событии. totalSwaps — running counter.
Принцип: Transfer и Swap — immutable записи (факт произошёл — запись неизменна). PoolState — mutable состояние (обновляется при каждом Sync). Разные типы данных — разные паттерны persistence (
INSERTvsUPSERT).
Мульти-событийный процессор
.addLog({ topic0: [SWAP] })
.addLog({ topic0: [SYNC] })
Ключевая идея: один EvmBatchProcessor может подписаться на несколько фильтров одновременно. Каждый .addLog() добавляет отдельный фильтр:
// Uniswap V2 Swap event topic0
// keccak256('Swap(address,uint256,uint256,uint256,uint256,address)')
export const SWAP_TOPIC =
'0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
// Uniswap V2 Sync event topic0
// keccak256('Sync(uint112,uint112)')
export const SYNC_TOPIC =
'0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1'
export const processor = new EvmBatchProcessor()
.setRpcEndpoint('http://anvil:8545')
.setFinalityConfirmation(1)
// Фильтр 1: ERC-20 Transfer события
.addLog({
address: [TOKEN_ADDRESS],
topic0: [TRANSFER_TOPIC],
})
// Фильтр 2: Uniswap Swap события
.addLog({
address: [PAIR_ADDRESS],
topic0: [SWAP_TOPIC],
})
// Фильтр 3: Uniswap Sync события
.addLog({
address: [PAIR_ADDRESS],
topic0: [SYNC_TOPIC],
})
.setFields({
log: { transactionHash: true, topics: true, data: true },
})
Один процессор, три фильтра. Processor получает ВСЕ подходящие логи в одном батче. В handler мы проверяем topic0, чтобы определить тип события.
Handler для нескольких событий
Главный паттерн — switch по topic0 для маршрутизации событий:
processor.run(db, async (ctx) => {
const transfers: Transfer[] = []
const swaps: Swap[] = []
for (const block of ctx.blocks) {
for (const log of block.logs) {
switch (log.topics[0]) {
case TRANSFER_TOPIC:
if (log.address === TOKEN_ADDRESS) {
transfers.push(decodeTransfer(log, block))
}
break
case SWAP_TOPIC:
if (log.address === PAIR_ADDRESS) {
swaps.push(decodeSwap(log, block))
}
break
case SYNC_TOPIC:
if (log.address === PAIR_ADDRESS) {
await updatePoolState(ctx.store, log, block)
}
break
}
}
}
await ctx.store.insert(transfers)
await ctx.store.insert(swaps)
})
Три паттерна persistence в одном handler:
Transfer— batchINSERT(собираем массив, вставляем разом)Swap— batchINSERT(аналогично)PoolState— immediateUPSERT(обновляем текущее состояние сразу)
Почему PoolState не батчим? Каждое Sync событие обновляет ОДНУ и ту же запись (один пул). Если в одном батче два Sync события, нужно применить оба последовательно. Batch insert не поддерживает update одной записи дважды.
Проверка знанийКак один EvmBatchProcessor обрабатывает несколько типов событий?
Декодирование Swap события
Uniswap V2 Swap event имеет следующую сигнатуру:
event Swap(
address indexed sender, // topic1
uint amount0In, // data[0:32]
uint amount1In, // data[32:64]
uint amount0Out, // data[64:96]
uint amount1Out, // data[96:128]
address indexed to // topic2
);
Декодируем вручную:
function decodeSwap(log: Log, block: Block): Swap {
// indexed параметры в topics
const sender = '0x' + log.topics[1].slice(26)
const to = '0x' + log.topics[2].slice(26)
// Non-indexed параметры в data (каждый по 32 байта = 64 hex)
const data = log.data.slice(2) // Убираем 0x prefix
const amount0In = BigInt('0x' + data.slice(0, 64))
const amount1In = BigInt('0x' + data.slice(64, 128))
const amount0Out = BigInt('0x' + data.slice(128, 192))
const amount1Out = BigInt('0x' + data.slice(192, 256))
return new Swap({
id: log.id,
sender,
to,
amount0In,
amount1In,
amount0Out,
amount1Out,
timestamp: new Date(block.header.timestamp),
blockNumber: block.header.height,
txHash: log.transactionHash,
})
}
Каждый параметр — 32 байта (64 hex символа) в data. indexed параметры идут в topics. Non-indexed — последовательно в data.
Интуитивно: Представьте посылку с 4 вложениями одинакового размера. Вы знаете, что каждое вложение — ровно 64 символа. Просто отсчитываете: 0-64, 64-128, 128-192, 192-256.
Агрегации через PoolState
Sync событие обновляет резервы пула. Мы используем UPSERT pattern — создаём запись при первом Sync, обновляем при каждом следующем:
async function updatePoolState(store: Store, log: Log, block: Block) {
const data = log.data.slice(2)
const reserve0 = BigInt('0x' + data.slice(0, 64))
const reserve1 = BigInt('0x' + data.slice(64, 128))
let pool = await store.get(PoolState, PAIR_ADDRESS)
if (!pool) {
pool = new PoolState({
id: PAIR_ADDRESS,
reserve0: 0n,
reserve1: 0n,
totalSwaps: 0,
lastUpdated: new Date(),
})
}
pool.reserve0 = reserve0
pool.reserve1 = reserve1
pool.totalSwaps += 1
pool.lastUpdated = new Date(block.header.timestamp)
await store.upsert(pool)
}
PoolState.totalSwaps — running counter, инкрементируется при каждом Sync. reserve0 и reserve1 перезаписываются актуальными значениями. GraphQL запрос вернёт текущее состояние пула.
Проверка знанийПочему PoolState обновляется через UPSERT, а не batch INSERT?
GraphQL запросы для мульти-данных
Теперь один GraphQL API отдаёт все три типа данных:
Последние 5 свапов
query {
swaps(orderBy: blockNumber_DESC, limit: 5) {
sender
amount0In
amount1Out
blockNumber
}
}
Текущее состояние пула
query {
poolStateById(id: "0x...") {
reserve0
reserve1
totalSwaps
lastUpdated
}
}
Комбинированный запрос (три entity в одном)
query {
transfers(limit: 5, orderBy: blockNumber_DESC) {
from
to
value
}
swaps(limit: 5, orderBy: blockNumber_DESC) {
sender
amount0Out
amount1Out
}
poolStateById(id: "0x...") {
reserve0
reserve1
totalSwaps
}
}
Одним запросом — последние трансферы, свапы и текущие резервы пула. REST API потребовал бы 3 отдельных endpoint’а.
Алгоритмический уровень
Event routing как pattern matching:
EventRouter:
INPUT: log (from blockchain batch)
MATCH log.topic0:
TRANSFER_TOPIC -> decodeTransfer(log) -> batch[] -> INSERT Transfer entity
SWAP_TOPIC -> decodeSwap(log) -> batch[] -> INSERT Swap entity
SYNC_TOPIC -> decodeSync(log) -> UPSERT PoolState entity
_ -> skip (unrecognized event)
END MATCH
FLUSH:
INSERT ALL Transfer[] (one SQL statement)
INSERT ALL Swap[] (one SQL statement)
// PoolState already upserted inline
Сложность:
- Маршрутизация:
O(1)per event (topic0 lookup) - Batch insert:
O(N)один SQL для N записей - PoolState upsert:
O(1)per Sync event
Итоги
| Концепция | Описание |
|---|---|
| Мульти-фильтр | Несколько .addLog() на одном процессоре |
| Topic0 routing | switch по log.topics[0] для маршрутизации |
| Batch INSERT | Immutable events (Transfer, Swap) собираются в массив |
| UPSERT | Mutable state (PoolState) обновляется на месте |
| Агрегации | Running counters и текущие значения в отдельном entity |
| Комбинированный GraphQL | Несколько entity types в одном запросе |
Что дальше: В INDEX-06 мы рассмотрим продвинутые паттерны Subsquid: индексацию governance (stateful entity с жизненным циклом), WebSocket подписки и multi-chain конфигурацию.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс
Войдите чтобы оценить урок