Требуемые знания:
- 05-subsquid-multi-event
Subsquid: продвинутые паттерны
От stateless к stateful индексации
Мы индексировали ERC-20 Transfer и Uniswap Swap события. Оба — stateless: каждое событие — самодостаточная запись. Transfer произошёл — записали. Swap произошёл — записали. Запись после создания никогда не меняется.
Но governance — это STATEFUL: у Proposal есть жизненный цикл.
Created -> Active -> Succeeded -> Queued -> Executed
-> Defeated (альтернативный путь)
Как индексировать СОСТОЯНИЕ, а не просто события?
В Module 7 мы создали
GovernanceToken,MyGovernorиTreasury(вlabs/ethereum/contracts/governance/). MyGovernor использует OpenZeppelin v5 с 4 extensions: CountingSimple, Votes, VotesQuorumFraction, TimelockControl. Теперь мы построим индексатор, который отслеживает ВСЕ этапы жизненного цикла предложения.
Governance индексация
Схема для proposals
type Proposal @entity {
id: ID! # proposalId
proposer: String! @index
description: String!
status: String! @index # "Pending" | "Active" | "Succeeded" | "Defeated" | "Executed"
forVotes: BigInt!
againstVotes: BigInt!
abstainVotes: BigInt!
createdAt: DateTime!
updatedAt: DateTime!
blockNumber: Int!
votes: [Vote!] @derivedFrom(field: "proposal")
}
type Vote @entity {
id: ID!
proposal: Proposal!
voter: String! @index
support: Int! # 0=Against, 1=For, 2=Abstain
weight: BigInt!
timestamp: DateTime!
}
Два entity, связанных через @derivedFrom: Proposal содержит агрегированные подсчёты голосов. Vote — индивидуальная запись каждого голоса. GraphQL позволяет получить proposal вместе со всеми его голосами в одном запросе.
События OpenZeppelin Governor
MyGovernor генерирует следующие события:
// ProposalCreated -- создание нового предложения
event ProposalCreated(
uint256 proposalId,
address proposer,
address[] targets,
uint256[] values,
string[] signatures,
bytes[] calldatas,
uint256 voteStart,
uint256 voteEnd,
string description
);
// VoteCast -- голосование
event VoteCast(
address indexed voter,
uint256 proposalId,
uint8 support, // 0=Against, 1=For, 2=Abstain
uint256 weight,
string reason
);
// ProposalExecuted -- успешное исполнение
event ProposalExecuted(uint256 proposalId);
Handler для stateful entities
// Topic0 хеши для Governor событий
const PROPOSAL_CREATED_TOPIC =
'0x7d84a6263ae0d98d3329bd7b46bb4e8d6f98cd35a7adb45c274c8b7fd5ebd5e0'
const VOTE_CAST_TOPIC =
'0xb8e138887d0aa13bab447e82de9d5c1777041ecd21ca36ba824ff1e6c07ddda4'
const PROPOSAL_EXECUTED_TOPIC =
'0x712ae1383f79ac853f8d882153778e0260ef8f03b504e2a0bffb6de791e1f801'
processor.run(db, async (ctx) => {
for (const block of ctx.blocks) {
for (const log of block.logs) {
switch (log.topics[0]) {
case PROPOSAL_CREATED_TOPIC: {
// Декодируем proposalId и description из data
const proposalId = BigInt('0x' + log.data.slice(2, 66))
const proposer = '0x' + log.data.slice(90, 130)
const proposal = new Proposal({
id: proposalId.toString(),
proposer,
description: '...', // Decode from ABI
status: 'Pending',
forVotes: 0n,
againstVotes: 0n,
abstainVotes: 0n,
createdAt: new Date(block.header.timestamp),
updatedAt: new Date(block.header.timestamp),
blockNumber: block.header.height,
})
await ctx.store.insert(proposal)
break
}
case VOTE_CAST_TOPIC: {
const voter = '0x' + log.topics[1].slice(26)
const data = log.data.slice(2)
const proposalId = BigInt('0x' + data.slice(0, 64))
const support = Number(BigInt('0x' + data.slice(64, 128)))
const weight = BigInt('0x' + data.slice(128, 192))
// 1. Сохранить голос
const vote = new Vote({
id: log.id,
proposal: { id: proposalId.toString() } as Proposal,
voter,
support,
weight,
timestamp: new Date(block.header.timestamp),
})
await ctx.store.insert(vote)
// 2. Обновить агрегации в Proposal
const prop = await ctx.store.get(Proposal, proposalId.toString())
if (prop) {
if (support === 1) prop.forVotes += weight
else if (support === 0) prop.againstVotes += weight
else prop.abstainVotes += weight
prop.updatedAt = new Date(block.header.timestamp)
await ctx.store.upsert(prop)
}
break
}
case PROPOSAL_EXECUTED_TOPIC: {
const proposalId = BigInt('0x' + log.data.slice(2, 66))
const execProp = await ctx.store.get(
Proposal,
proposalId.toString()
)
if (execProp) {
execProp.status = 'Executed'
execProp.updatedAt = new Date(block.header.timestamp)
await ctx.store.upsert(execProp)
}
break
}
}
}
}
})
Ключевой паттерн: stateful индексация
Governance — пример STATEFUL индексации:
- ProposalCreated —
INSERTновый Proposal со statusPending - VoteCast —
INSERTVote +UPSERTProposal (обновить счётчики голосов) - ProposalExecuted —
UPSERTProposal (изменить status наExecuted)
Entity обновляется при каждом новом событии. Proposal.status меняется от Created до Executed. Это принципиально отличается от immutable Transfer записей.
GraphQL запросы для governance
# Все предложения с голосами
query {
proposals(orderBy: createdAt_DESC) {
id
proposer
description
status
forVotes
againstVotes
abstainVotes
votes {
voter
support
weight
}
}
}
# Предложения по статусу
query {
proposals(where: { status_eq: "Active" }) {
id
description
forVotes
againstVotes
}
}
# Голоса конкретного избирателя
query {
votes(where: { voter_eq: "0xf39f..." }) {
proposal { id description }
support
weight
}
}
WebSocket подписки
Governance dashboard хочет обновляться в реальном времени — новые голоса, смена статуса. Subsquid GraphQL server поддерживает subscriptions.
Включение подписок
В LAB-07 подписки уже включены. GraphQL server запускается с флагом --subscriptions:
npx squid-graphql-server --subscriptions
Subscription query
subscription {
proposals(orderBy: updatedAt_DESC, limit: 5) {
id
status
forVotes
againstVotes
updatedAt
}
}
Подписка автоматически обновляется каждые 5 секунд (poll interval). Клиент получает PUSH через WebSocket.
Frontend интеграция с urql
import { useSubscription } from 'urql'
const PROPOSAL_SUBSCRIPTION = `
subscription {
proposals(orderBy: updatedAt_DESC, limit: 5) {
id
status
forVotes
againstVotes
updatedAt
}
}
`
function GovernanceDashboard() {
const [result] = useSubscription({
query: PROPOSAL_SUBSCRIPTION,
})
if (result.fetching) return <p>Listening...</p>
if (result.error) return <p>Error: {result.error.message}</p>
return (
<ul>
{result.data?.proposals.map((p: any) => (
<li key={p.id}>
Proposal #{p.id}: {p.status} (For: {p.forVotes}, Against: {p.againstVotes})
</li>
))}
</ul>
)
}
Внимание: Подписки Subsquid масштабируются плохо под нагрузкой (RAM). Для одного студента — отлично. Для production с тысячами подключений — рассмотрите polling или внешний PubSub (Redis, Kafka). Официальная документация предупреждает: “RAM usage of subscriptions scales poorly under high load.”
БОНУС: Multi-chain индексация
Этот раздел — бонусный. Multi-chain не требуется для прохождения курса, но демонстрирует масштабируемость Subsquid.
Subsquid поддерживает multi-chain: один shared schema, несколько processors (по одному на сеть).
squid.yaml для multi-chain
manifest_version: subsquid.io/v0.1
name: multichain-squid
version: 1
description: Multi-chain ERC-20 indexer
build:
deploy:
addons:
postgres:
processor:
- name: eth-processor
cmd: ["node", "lib/eth/main.js"]
- name: arb-processor
cmd: ["node", "lib/arbitrum/main.js"]
api:
cmd: ["npx", "squid-graphql-server", "--subscriptions"]
Структура проекта
src/
├── eth/main.ts # Ethereum processor
├── arbitrum/main.ts # Arbitrum processor
├── model/ # Shared entities (Transfer, Account)
└── abi/ # Shared ABI types
Каждый processor — отдельный Node.js процесс со своим RPC endpoint:
// src/eth/main.ts
const processor = new EvmBatchProcessor()
.setRpcEndpoint('https://eth-mainnet.public.blastapi.io')
.setFinalityConfirmation(75) // Ethereum: 75 блоков
// src/arbitrum/main.ts
const processor = new EvmBatchProcessor()
.setRpcEndpoint('https://sepolia-rollup.arbitrum.io/rpc')
.setFinalityConfirmation(1) // Arbitrum: 1 блок
Для локальной разработки можно запустить два Anvil узла с разными chain ID. Shared schema позволяет ОДНИМ GraphQL запросом получить данные с обеих сетей.
Arbitrum Sepolia: Chain ID 421614, RPC
https://sepolia-rollup.arbitrum.io/rpc. Публичные RPC имеют rate limits. Для продолжительного тестирования используйте Alchemy или Infura free tier.
Production-паттерны
Error handling
processor.run(db, async (ctx) => {
for (const block of ctx.blocks) {
for (const log of block.logs) {
try {
// ... decode and process
} catch (err) {
ctx.log.error(
{ blockNumber: block.header.height, logId: log.id },
`Failed to process log: ${err}`
)
// Продолжаем обработку следующих логов
}
}
}
})
Database indexes
type Transfer @entity {
from: String! @index # B-tree index для WHERE from = '0x...'
to: String! @index # B-tree index для WHERE to = '0x...'
txHash: String! @index # B-tree index для WHERE txHash = '0x...'
# НЕ индексируйте всё подряд -- каждый индекс замедляет INSERT
}
Правило: @index на полях, которые часто появляются в WHERE и orderBy. Не индексируйте value или timestamp, если не фильтруете по ним.
Hot blocks
const db = new TypeormDatabase({ supportHotBlocks: true })
supportHotBlocks: true — корректная обработка unfinalized блоков. Если происходит reorg (переключение на другую цепочку), процессор автоматически откатит данные до последнего finalized блока.
Мониторинг скорости
processor.run(db, async (ctx) => {
const start = Date.now()
// ... process batch ...
const elapsed = Date.now() - start
ctx.log.info(
`Batch: ${ctx.blocks.length} blocks, ` +
`${transfers.length} transfers, ` +
`${elapsed}ms`
)
})
Checkpoint и перезапуск
Subsquid processor автоматически сохраняет checkpoint в базе данных. При перезапуске контейнера — продолжает с последнего обработанного блока. Не нужно индексировать заново.
# Перезапуск без потери прогресса
docker compose --profile subsquid restart subsquid-processor
# Полный сброс (удалить базу):
docker compose --profile subsquid down -v
docker compose --profile subsquid up -d
Алгоритмический уровень
Stateful entity indexing pattern:
StatefulIndexer:
on ProposalCreated(id, proposer, description):
db.INSERT Proposal(id, status="Pending", proposer, description)
on VoteCast(proposalId, voter, support, weight):
db.INSERT Vote(voter, proposalId, support, weight)
proposal = db.GET Proposal(proposalId)
proposal.votes[support] += weight
db.UPSERT proposal
on ProposalExecuted(proposalId):
proposal = db.GET Proposal(proposalId)
proposal.status = "Executed"
db.UPSERT proposal
Invariants:
- Proposal.forVotes = SUM(Vote.weight WHERE support=1 AND proposalId=id)
- Proposal.againstVotes = SUM(Vote.weight WHERE support=0 AND proposalId=id)
- Proposal.status reflects latest lifecycle event
Сложность stateful индексации:
INSERTVote:O(1)per eventGET + UPSERTProposal:O(1)(primary key lookup)- Общая:
O(E)гдеE— количество governance событий
Итоги
| Паттерн | Описание | Пример |
|---|---|---|
| Stateful entities | Entity обновляется при каждом событии | Proposal lifecycle |
| @derivedFrom | Связь parent -> children в GraphQL | Proposal -> Votes |
| WebSocket subscriptions | Real-time обновления через GraphQL | Governance dashboard |
| Multi-chain | Несколько processors, shared schema | Ethereum + Arbitrum |
| Error handling | try/catch с ctx.log.error | Production resilience |
| Hot blocks | supportHotBlocks для reorg handling | Live-данные |
Что дальше: В INDEX-07 мы перейдём к The Graph — второму крупнейшему инструменту индексации. Главное отличие: AssemblyScript вместо TypeScript. Это не просто другой синтаксис — это другой язык с другими ограничениями.
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс