Перейти к содержанию
Learning Platform
Средний
35 минут
Subsquid Multi-Event Uniswap V2 Swap Events Aggregations Schema Design

Требуемые знания:

  • 04-subsquid-erc20-indexer

Subsquid: мульти-событийная индексация

От одного события к десяткам

В INDEX-04 мы индексировали один тип событий — ERC-20 Transfer. Но реальные dApps работают с десятками типов событий. Uniswap V2 пара генерирует:

  • Swap — обмены токенов
  • Sync — обновление резервов пула
  • Mint / Burn — добавление / удаление ликвидности

Как индексировать ВСЁ это ОДНИМ процессором?

В Module 5 мы изучали Uniswap V2 и формулу xy = k. Теперь мы построим индексатор, который будет отслеживать ВСЕ операции Uniswap пары — каждый обмен, каждое изменение резервов, каждый Transfer токенов. Один процессор, одна база, один GraphQL API.

Расширение схемы

В INDEX-04 у нас были два entity: Transfer и Account. Теперь добавим Swap и PoolState:

# Расширенная схема для мульти-событийной индексации

type Transfer @entity {
  id: ID!
  from: String! @index
  to: String! @index
  value: BigInt!
  timestamp: DateTime!
  blockNumber: Int!
  txHash: String! @index
}

type Swap @entity {
  id: ID!
  sender: String! @index
  to: String! @index
  amount0In: BigInt!
  amount1In: BigInt!
  amount0Out: BigInt!
  amount1Out: BigInt!
  timestamp: DateTime!
  blockNumber: Int!
  txHash: String! @index
}

type PoolState @entity {
  id: ID!              # Pool address
  reserve0: BigInt!
  reserve1: BigInt!
  totalSwaps: Int!
  lastUpdated: DateTime!
}

type Account @entity {
  id: ID!
  balance: BigInt!
}

Каждый тип события отображается на свой entity type. PoolState — это агрегация: entity обновляется при каждом Sync событии. totalSwaps — running counter.

Принцип: Transfer и Swap — immutable записи (факт произошёл — запись неизменна). PoolState — mutable состояние (обновляется при каждом Sync). Разные типы данных — разные паттерны persistence (INSERT vs UPSERT).

Мульти-событийный процессор

Мульти-событийная индексация: Uniswap и Governance
Transfer
topic0: 0xddf252ad...
Swap
topic0: 0xd78ad95f...
Sync
topic0: 0x1c411e9a...
EvmBatchProcessor
.addLog({ topic0: [TRANSFER] })
.addLog({ topic0: [SWAP] })
.addLog({ topic0: [SYNC] })
Transfer entity
from, to, value
Swap entity
amount0In, amount1In, amount0Out, amount1Out
Pool entity
reserve0, reserve1
Один процессор -- несколько событийОдин процессор может индексировать НЕСКОЛЬКО типов событий. Для Uniswap V2: Transfer (токены), Swap (обмены), Sync (резервы). Каждый тип -> своя entity + свой handler в main.ts.
Governance паттернGovernance: ProposalCreated, VoteCast, ProposalExecuted -> Proposal entity со статусом (Pending -> Active -> Succeeded -> Executed).

Ключевая идея: один EvmBatchProcessor может подписаться на несколько фильтров одновременно. Каждый .addLog() добавляет отдельный фильтр:

// Uniswap V2 Swap event topic0
// keccak256('Swap(address,uint256,uint256,uint256,uint256,address)')
export const SWAP_TOPIC =
  '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'

// Uniswap V2 Sync event topic0
// keccak256('Sync(uint112,uint112)')
export const SYNC_TOPIC =
  '0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1'

export const processor = new EvmBatchProcessor()
  .setRpcEndpoint('http://anvil:8545')
  .setFinalityConfirmation(1)
  // Фильтр 1: ERC-20 Transfer события
  .addLog({
    address: [TOKEN_ADDRESS],
    topic0: [TRANSFER_TOPIC],
  })
  // Фильтр 2: Uniswap Swap события
  .addLog({
    address: [PAIR_ADDRESS],
    topic0: [SWAP_TOPIC],
  })
  // Фильтр 3: Uniswap Sync события
  .addLog({
    address: [PAIR_ADDRESS],
    topic0: [SYNC_TOPIC],
  })
  .setFields({
    log: { transactionHash: true, topics: true, data: true },
  })

Один процессор, три фильтра. Processor получает ВСЕ подходящие логи в одном батче. В handler мы проверяем topic0, чтобы определить тип события.

Handler для нескольких событий

Главный паттерн — switch по topic0 для маршрутизации событий:

processor.run(db, async (ctx) => {
  const transfers: Transfer[] = []
  const swaps: Swap[] = []

  for (const block of ctx.blocks) {
    for (const log of block.logs) {
      switch (log.topics[0]) {
        case TRANSFER_TOPIC:
          if (log.address === TOKEN_ADDRESS) {
            transfers.push(decodeTransfer(log, block))
          }
          break

        case SWAP_TOPIC:
          if (log.address === PAIR_ADDRESS) {
            swaps.push(decodeSwap(log, block))
          }
          break

        case SYNC_TOPIC:
          if (log.address === PAIR_ADDRESS) {
            await updatePoolState(ctx.store, log, block)
          }
          break
      }
    }
  }

  await ctx.store.insert(transfers)
  await ctx.store.insert(swaps)
})

Три паттерна persistence в одном handler:

  • Transfer — batch INSERT (собираем массив, вставляем разом)
  • Swap — batch INSERT (аналогично)
  • PoolState — immediate UPSERT (обновляем текущее состояние сразу)

Почему PoolState не батчим? Каждое Sync событие обновляет ОДНУ и ту же запись (один пул). Если в одном батче два Sync события, нужно применить оба последовательно. Batch insert не поддерживает update одной записи дважды.

Декодирование Swap события

Uniswap V2 Swap event имеет следующую сигнатуру:

event Swap(
    address indexed sender,   // topic1
    uint amount0In,           // data[0:32]
    uint amount1In,           // data[32:64]
    uint amount0Out,          // data[64:96]
    uint amount1Out,          // data[96:128]
    address indexed to        // topic2
);

Декодируем вручную:

function decodeSwap(log: Log, block: Block): Swap {
  // indexed параметры в topics
  const sender = '0x' + log.topics[1].slice(26)
  const to = '0x' + log.topics[2].slice(26)

  // Non-indexed параметры в data (каждый по 32 байта = 64 hex)
  const data = log.data.slice(2) // Убираем 0x prefix
  const amount0In  = BigInt('0x' + data.slice(0, 64))
  const amount1In  = BigInt('0x' + data.slice(64, 128))
  const amount0Out = BigInt('0x' + data.slice(128, 192))
  const amount1Out = BigInt('0x' + data.slice(192, 256))

  return new Swap({
    id: log.id,
    sender,
    to,
    amount0In,
    amount1In,
    amount0Out,
    amount1Out,
    timestamp: new Date(block.header.timestamp),
    blockNumber: block.header.height,
    txHash: log.transactionHash,
  })
}

Каждый параметр — 32 байта (64 hex символа) в data. indexed параметры идут в topics. Non-indexed — последовательно в data.

Интуитивно: Представьте посылку с 4 вложениями одинакового размера. Вы знаете, что каждое вложение — ровно 64 символа. Просто отсчитываете: 0-64, 64-128, 128-192, 192-256.

Агрегации через PoolState

Sync событие обновляет резервы пула. Мы используем UPSERT pattern — создаём запись при первом Sync, обновляем при каждом следующем:

async function updatePoolState(store: Store, log: Log, block: Block) {
  const data = log.data.slice(2)
  const reserve0 = BigInt('0x' + data.slice(0, 64))
  const reserve1 = BigInt('0x' + data.slice(64, 128))

  let pool = await store.get(PoolState, PAIR_ADDRESS)
  if (!pool) {
    pool = new PoolState({
      id: PAIR_ADDRESS,
      reserve0: 0n,
      reserve1: 0n,
      totalSwaps: 0,
      lastUpdated: new Date(),
    })
  }

  pool.reserve0 = reserve0
  pool.reserve1 = reserve1
  pool.totalSwaps += 1
  pool.lastUpdated = new Date(block.header.timestamp)

  await store.upsert(pool)
}

PoolState.totalSwaps — running counter, инкрементируется при каждом Sync. reserve0 и reserve1 перезаписываются актуальными значениями. GraphQL запрос вернёт текущее состояние пула.

GraphQL запросы для мульти-данных

Теперь один GraphQL API отдаёт все три типа данных:

Последние 5 свапов

query {
  swaps(orderBy: blockNumber_DESC, limit: 5) {
    sender
    amount0In
    amount1Out
    blockNumber
  }
}

Текущее состояние пула

query {
  poolStateById(id: "0x...") {
    reserve0
    reserve1
    totalSwaps
    lastUpdated
  }
}

Комбинированный запрос (три entity в одном)

query {
  transfers(limit: 5, orderBy: blockNumber_DESC) {
    from
    to
    value
  }
  swaps(limit: 5, orderBy: blockNumber_DESC) {
    sender
    amount0Out
    amount1Out
  }
  poolStateById(id: "0x...") {
    reserve0
    reserve1
    totalSwaps
  }
}

Одним запросом — последние трансферы, свапы и текущие резервы пула. REST API потребовал бы 3 отдельных endpoint’а.

Алгоритмический уровень

Event routing как pattern matching:

EventRouter:
  INPUT: log (from blockchain batch)
  MATCH log.topic0:
    TRANSFER_TOPIC -> decodeTransfer(log) -> batch[] -> INSERT Transfer entity
    SWAP_TOPIC     -> decodeSwap(log)     -> batch[] -> INSERT Swap entity
    SYNC_TOPIC     -> decodeSync(log)     -> UPSERT PoolState entity
    _              -> skip (unrecognized event)
  END MATCH

  FLUSH:
    INSERT ALL Transfer[] (one SQL statement)
    INSERT ALL Swap[] (one SQL statement)
    // PoolState already upserted inline

Сложность:

  • Маршрутизация: O(1) per event (topic0 lookup)
  • Batch insert: O(N) один SQL для N записей
  • PoolState upsert: O(1) per Sync event

Итоги

КонцепцияОписание
Мульти-фильтрНесколько .addLog() на одном процессоре
Topic0 routingswitch по log.topics[0] для маршрутизации
Batch INSERTImmutable events (Transfer, Swap) собираются в массив
UPSERTMutable state (PoolState) обновляется на месте
АгрегацииRunning counters и текущие значения в отдельном entity
Комбинированный GraphQLНесколько entity types в одном запросе

Что дальше: В INDEX-06 мы рассмотрим продвинутые паттерны Subsquid: индексацию governance (stateful entity с жизненным циклом), WebSocket подписки и multi-chain конфигурацию.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс