Skip to content
Learning Platform
Advanced
35 minutes
Subsquid Governance Indexing Multi-Chain WebSocket Subscriptions Production Patterns

Prerequisites:

  • 05-subsquid-multi-event

Subsquid: продвинутые паттерны

От stateless к stateful индексации

Мы индексировали ERC-20 Transfer и Uniswap Swap события. Оба — stateless: каждое событие — самодостаточная запись. Transfer произошёл — записали. Swap произошёл — записали. Запись после создания никогда не меняется.

Но governance — это STATEFUL: у Proposal есть жизненный цикл.

Created -> Active -> Succeeded -> Queued -> Executed
                  -> Defeated (альтернативный путь)

Как индексировать СОСТОЯНИЕ, а не просто события?

В Module 7 мы создали GovernanceToken, MyGovernor и Treasurylabs/ethereum/contracts/governance/). MyGovernor использует OpenZeppelin v5 с 4 extensions: CountingSimple, Votes, VotesQuorumFraction, TimelockControl. Теперь мы построим индексатор, который отслеживает ВСЕ этапы жизненного цикла предложения.

Governance индексация

Схема для proposals

type Proposal @entity {
  id: ID!                    # proposalId
  proposer: String! @index
  description: String!
  status: String! @index     # "Pending" | "Active" | "Succeeded" | "Defeated" | "Executed"
  forVotes: BigInt!
  againstVotes: BigInt!
  abstainVotes: BigInt!
  createdAt: DateTime!
  updatedAt: DateTime!
  blockNumber: Int!
  votes: [Vote!] @derivedFrom(field: "proposal")
}

type Vote @entity {
  id: ID!
  proposal: Proposal!
  voter: String! @index
  support: Int!              # 0=Against, 1=For, 2=Abstain
  weight: BigInt!
  timestamp: DateTime!
}

Два entity, связанных через @derivedFrom: Proposal содержит агрегированные подсчёты голосов. Vote — индивидуальная запись каждого голоса. GraphQL позволяет получить proposal вместе со всеми его голосами в одном запросе.

События OpenZeppelin Governor

MyGovernor генерирует следующие события:

// ProposalCreated -- создание нового предложения
event ProposalCreated(
    uint256 proposalId,
    address proposer,
    address[] targets,
    uint256[] values,
    string[] signatures,
    bytes[] calldatas,
    uint256 voteStart,
    uint256 voteEnd,
    string description
);

// VoteCast -- голосование
event VoteCast(
    address indexed voter,
    uint256 proposalId,
    uint8 support,       // 0=Against, 1=For, 2=Abstain
    uint256 weight,
    string reason
);

// ProposalExecuted -- успешное исполнение
event ProposalExecuted(uint256 proposalId);

Handler для stateful entities

// Topic0 хеши для Governor событий
const PROPOSAL_CREATED_TOPIC =
  '0x7d84a6263ae0d98d3329bd7b46bb4e8d6f98cd35a7adb45c274c8b7fd5ebd5e0'
const VOTE_CAST_TOPIC =
  '0xb8e138887d0aa13bab447e82de9d5c1777041ecd21ca36ba824ff1e6c07ddda4'
const PROPOSAL_EXECUTED_TOPIC =
  '0x712ae1383f79ac853f8d882153778e0260ef8f03b504e2a0bffb6de791e1f801'

processor.run(db, async (ctx) => {
  for (const block of ctx.blocks) {
    for (const log of block.logs) {
      switch (log.topics[0]) {
        case PROPOSAL_CREATED_TOPIC: {
          // Декодируем proposalId и description из data
          const proposalId = BigInt('0x' + log.data.slice(2, 66))
          const proposer = '0x' + log.data.slice(90, 130)

          const proposal = new Proposal({
            id: proposalId.toString(),
            proposer,
            description: '...', // Decode from ABI
            status: 'Pending',
            forVotes: 0n,
            againstVotes: 0n,
            abstainVotes: 0n,
            createdAt: new Date(block.header.timestamp),
            updatedAt: new Date(block.header.timestamp),
            blockNumber: block.header.height,
          })
          await ctx.store.insert(proposal)
          break
        }

        case VOTE_CAST_TOPIC: {
          const voter = '0x' + log.topics[1].slice(26)
          const data = log.data.slice(2)
          const proposalId = BigInt('0x' + data.slice(0, 64))
          const support = Number(BigInt('0x' + data.slice(64, 128)))
          const weight = BigInt('0x' + data.slice(128, 192))

          // 1. Сохранить голос
          const vote = new Vote({
            id: log.id,
            proposal: { id: proposalId.toString() } as Proposal,
            voter,
            support,
            weight,
            timestamp: new Date(block.header.timestamp),
          })
          await ctx.store.insert(vote)

          // 2. Обновить агрегации в Proposal
          const prop = await ctx.store.get(Proposal, proposalId.toString())
          if (prop) {
            if (support === 1) prop.forVotes += weight
            else if (support === 0) prop.againstVotes += weight
            else prop.abstainVotes += weight
            prop.updatedAt = new Date(block.header.timestamp)
            await ctx.store.upsert(prop)
          }
          break
        }

        case PROPOSAL_EXECUTED_TOPIC: {
          const proposalId = BigInt('0x' + log.data.slice(2, 66))
          const execProp = await ctx.store.get(
            Proposal,
            proposalId.toString()
          )
          if (execProp) {
            execProp.status = 'Executed'
            execProp.updatedAt = new Date(block.header.timestamp)
            await ctx.store.upsert(execProp)
          }
          break
        }
      }
    }
  }
})

Ключевой паттерн: stateful индексация

Governance — пример STATEFUL индексации:

  1. ProposalCreatedINSERT новый Proposal со status Pending
  2. VoteCastINSERT Vote + UPSERT Proposal (обновить счётчики голосов)
  3. ProposalExecutedUPSERT Proposal (изменить status на Executed)

Entity обновляется при каждом новом событии. Proposal.status меняется от Created до Executed. Это принципиально отличается от immutable Transfer записей.

GraphQL запросы для governance

# Все предложения с голосами
query {
  proposals(orderBy: createdAt_DESC) {
    id
    proposer
    description
    status
    forVotes
    againstVotes
    abstainVotes
    votes {
      voter
      support
      weight
    }
  }
}

# Предложения по статусу
query {
  proposals(where: { status_eq: "Active" }) {
    id
    description
    forVotes
    againstVotes
  }
}

# Голоса конкретного избирателя
query {
  votes(where: { voter_eq: "0xf39f..." }) {
    proposal { id description }
    support
    weight
  }
}

WebSocket подписки

Governance dashboard хочет обновляться в реальном времени — новые голоса, смена статуса. Subsquid GraphQL server поддерживает subscriptions.

Включение подписок

В LAB-07 подписки уже включены. GraphQL server запускается с флагом --subscriptions:

npx squid-graphql-server --subscriptions

Subscription query

subscription {
  proposals(orderBy: updatedAt_DESC, limit: 5) {
    id
    status
    forVotes
    againstVotes
    updatedAt
  }
}

Подписка автоматически обновляется каждые 5 секунд (poll interval). Клиент получает PUSH через WebSocket.

Frontend интеграция с urql

import { useSubscription } from 'urql'

const PROPOSAL_SUBSCRIPTION = `
  subscription {
    proposals(orderBy: updatedAt_DESC, limit: 5) {
      id
      status
      forVotes
      againstVotes
      updatedAt
    }
  }
`

function GovernanceDashboard() {
  const [result] = useSubscription({
    query: PROPOSAL_SUBSCRIPTION,
  })

  if (result.fetching) return <p>Listening...</p>
  if (result.error) return <p>Error: {result.error.message}</p>

  return (
    <ul>
      {result.data?.proposals.map((p: any) => (
        <li key={p.id}>
          Proposal #{p.id}: {p.status} (For: {p.forVotes}, Against: {p.againstVotes})
        </li>
      ))}
    </ul>
  )
}

Внимание: Подписки Subsquid масштабируются плохо под нагрузкой (RAM). Для одного студента — отлично. Для production с тысячами подключений — рассмотрите polling или внешний PubSub (Redis, Kafka). Официальная документация предупреждает: “RAM usage of subscriptions scales poorly under high load.”

БОНУС: Multi-chain индексация

Этот раздел — бонусный. Multi-chain не требуется для прохождения курса, но демонстрирует масштабируемость Subsquid.

Subsquid поддерживает multi-chain: один shared schema, несколько processors (по одному на сеть).

squid.yaml для multi-chain

manifest_version: subsquid.io/v0.1
name: multichain-squid
version: 1
description: Multi-chain ERC-20 indexer

build:

deploy:
  addons:
    postgres:
  processor:
    - name: eth-processor
      cmd: ["node", "lib/eth/main.js"]
    - name: arb-processor
      cmd: ["node", "lib/arbitrum/main.js"]
  api:
    cmd: ["npx", "squid-graphql-server", "--subscriptions"]

Структура проекта

src/
├── eth/main.ts          # Ethereum processor
├── arbitrum/main.ts     # Arbitrum processor
├── model/               # Shared entities (Transfer, Account)
└── abi/                 # Shared ABI types

Каждый processor — отдельный Node.js процесс со своим RPC endpoint:

// src/eth/main.ts
const processor = new EvmBatchProcessor()
  .setRpcEndpoint('https://eth-mainnet.public.blastapi.io')
  .setFinalityConfirmation(75)  // Ethereum: 75 блоков

// src/arbitrum/main.ts
const processor = new EvmBatchProcessor()
  .setRpcEndpoint('https://sepolia-rollup.arbitrum.io/rpc')
  .setFinalityConfirmation(1)   // Arbitrum: 1 блок

Для локальной разработки можно запустить два Anvil узла с разными chain ID. Shared schema позволяет ОДНИМ GraphQL запросом получить данные с обеих сетей.

Arbitrum Sepolia: Chain ID 421614, RPC https://sepolia-rollup.arbitrum.io/rpc. Публичные RPC имеют rate limits. Для продолжительного тестирования используйте Alchemy или Infura free tier.

Production-паттерны

Error handling

processor.run(db, async (ctx) => {
  for (const block of ctx.blocks) {
    for (const log of block.logs) {
      try {
        // ... decode and process
      } catch (err) {
        ctx.log.error(
          { blockNumber: block.header.height, logId: log.id },
          `Failed to process log: ${err}`
        )
        // Продолжаем обработку следующих логов
      }
    }
  }
})

Database indexes

type Transfer @entity {
  from: String! @index     # B-tree index для WHERE from = '0x...'
  to: String! @index       # B-tree index для WHERE to = '0x...'
  txHash: String! @index   # B-tree index для WHERE txHash = '0x...'
  # НЕ индексируйте всё подряд -- каждый индекс замедляет INSERT
}

Правило: @index на полях, которые часто появляются в WHERE и orderBy. Не индексируйте value или timestamp, если не фильтруете по ним.

Hot blocks

const db = new TypeormDatabase({ supportHotBlocks: true })

supportHotBlocks: true — корректная обработка unfinalized блоков. Если происходит reorg (переключение на другую цепочку), процессор автоматически откатит данные до последнего finalized блока.

Мониторинг скорости

processor.run(db, async (ctx) => {
  const start = Date.now()
  // ... process batch ...
  const elapsed = Date.now() - start
  ctx.log.info(
    `Batch: ${ctx.blocks.length} blocks, ` +
    `${transfers.length} transfers, ` +
    `${elapsed}ms`
  )
})

Checkpoint и перезапуск

Subsquid processor автоматически сохраняет checkpoint в базе данных. При перезапуске контейнера — продолжает с последнего обработанного блока. Не нужно индексировать заново.

# Перезапуск без потери прогресса
docker compose --profile subsquid restart subsquid-processor

# Полный сброс (удалить базу):
docker compose --profile subsquid down -v
docker compose --profile subsquid up -d

Алгоритмический уровень

Stateful entity indexing pattern:

StatefulIndexer:
  on ProposalCreated(id, proposer, description):
    db.INSERT Proposal(id, status="Pending", proposer, description)

  on VoteCast(proposalId, voter, support, weight):
    db.INSERT Vote(voter, proposalId, support, weight)
    proposal = db.GET Proposal(proposalId)
    proposal.votes[support] += weight
    db.UPSERT proposal

  on ProposalExecuted(proposalId):
    proposal = db.GET Proposal(proposalId)
    proposal.status = "Executed"
    db.UPSERT proposal

Invariants:
  - Proposal.forVotes = SUM(Vote.weight WHERE support=1 AND proposalId=id)
  - Proposal.againstVotes = SUM(Vote.weight WHERE support=0 AND proposalId=id)
  - Proposal.status reflects latest lifecycle event

Сложность stateful индексации:

  • INSERT Vote: O(1) per event
  • GET + UPSERT Proposal: O(1) (primary key lookup)
  • Общая: O(E) где E — количество governance событий

Итоги

ПаттернОписаниеПример
Stateful entitiesEntity обновляется при каждом событииProposal lifecycle
@derivedFromСвязь parent -> children в GraphQLProposal -> Votes
WebSocket subscriptionsReal-time обновления через GraphQLGovernance dashboard
Multi-chainНесколько processors, shared schemaEthereum + Arbitrum
Error handlingtry/catch с ctx.log.errorProduction resilience
Hot blockssupportHotBlocks для reorg handlingLive-данные

Что дальше: В INDEX-07 мы перейдём к The Graph — второму крупнейшему инструменту индексации. Главное отличие: AssemblyScript вместо TypeScript. Это не просто другой синтаксис — это другой язык с другими ограничениями.

Finished the lesson?

Mark it as complete to track your progress