Требуемые знания:
- module-6/02-debezium-server-pubsub
- module-6/03-iam-workload-identity
Cloud Run для Event-Driven CDC
В предыдущих уроках мы настроили Debezium Server для публикации CDC событий в Pub/Sub, и использовали Dataflow для batch-репликации в BigQuery. Но что, если нужно выполнить кастомную бизнес-логику для каждого отдельного события?
Когда использовать Cloud Run для CDC
Cloud Run — это serverless платформа для запуска контейнеров, которая идеально подходит для event-driven обработки CDC событий.
Типичные Use Cases
| Use Case | Описание | Пример |
|---|---|---|
| Уведомления | Отправка email/Slack/SMS при создании записи | Новый заказ → уведомление в отдел продаж |
| Обновление индексов | Синхронизация с поисковыми системами | Изменение товара → обновление Elasticsearch |
| Инвалидация кэша | Очистка Redis/Memcached при изменении данных | Обновление цены → сброс кэша |
| Интеграции | Вызов внешних API для каждого события | Новый клиент → создание в CRM |
| Обогащение данных | Добавление дополнительной информации | Заказ → получение геолокации по IP |
Cloud Run vs Dataflow
| Аспект | Cloud Run | Dataflow |
|---|---|---|
| Паттерн обработки | Per-event (каждое событие отдельно) | Batch/Stream (агрегация, окна) |
| Latency | Очень низкая (миллисекунды) | Средняя (секунды/минуты) |
| Стоимость | Pay-per-request (низкая при малом трафике) | Минимум 1 worker постоянно |
| Кастомная логика | Полная свобода (любой язык/библиотеки) | Ограничена Apache Beam API |
| Use case | Бизнес-логика, интеграции, уведомления | Репликация, аналитика, агрегация |
Рекомендация: Используйте Cloud Run для реактивной бизнес-логики, Dataflow для репликации данных.
Архитектура: Pub/Sub → Eventarc → Cloud Run
Serverless event-driven обработка CDC событий
Server
Topic
Trigger
Service
Pub/Sub base64-кодирует данные в message.data — нужно декодировать → JSON
min-instances=0 позволяет scale to zero (cost optimization)
Поток событий:
- Cloud SQL генерирует изменения → Debezium Server захватывает через logical decoding
- Debezium Server публикует события в Pub/Sub топики (один топик на таблицу)
- Eventarc создает триггер: при новом сообщении в топике → вызвать Cloud Run
- Cloud Run обрабатывает событие и выполняет кастомную логику
Cloud Run Service: Python Flask Handler
Cloud Run сервис — это обычный HTTP-сервер, который принимает POST запросы с Pub/Sub сообщениями.
Структура проекта
cdc-processor/
├── main.py # Flask приложение
├── requirements.txt # Зависимости
├── Dockerfile # Контейнер образ
└── .gcloudignore # Игнорируемые файлы
main.py: Обработчик CDC событий
from flask import Flask, request
import base64
import json
import logging
# ==============================================================================
# Настройка логирования для Cloud Logging
# ==============================================================================
import google.cloud.logging
client = google.cloud.logging.Client()
client.setup_logging()
app = Flask(__name__)
@app.route("/", methods=["POST"])
def handle_cdc_event():
"""
Обработчик Pub/Sub событий от Eventarc.
Формат envelope:
{
"message": {
"data": "base64-encoded Debezium event",
"messageId": "...",
"publishTime": "..."
},
"subscription": "..."
}
"""
envelope = request.get_json()
# Валидация Pub/Sub сообщения
if not envelope or "message" not in envelope:
logging.error("Invalid Pub/Sub envelope: missing 'message' field")
return "Bad Request: no Pub/Sub message received", 400
pubsub_message = envelope["message"]
# Декодирование Debezium события
if "data" not in pubsub_message:
logging.error("Invalid Pub/Sub message: no 'data' field")
return "Bad Request: no data in message", 400
try:
# Декодирование base64 → UTF-8 → JSON
data = base64.b64decode(pubsub_message["data"]).decode("utf-8")
event = json.loads(data)
except Exception as e:
logging.error(f"Failed to decode Pub/Sub data: {e}")
return "Bad Request: invalid data encoding", 400
# ==============================================================================
# Парсинг Debezium event структуры
# ==============================================================================
payload = event.get("payload", {})
# Операция: c (create), u (update), d (delete), r (read/snapshot)
operation = payload.get("op")
# Данные до/после изменения
before = payload.get("before") # null для INSERT
after = payload.get("after") # null для DELETE
# Метаданные источника
source = payload.get("source", {})
table = source.get("table")
timestamp_ms = source.get("ts_ms")
logging.info(f"Received CDC event: table={table}, op={operation}, ts={timestamp_ms}")
# ==============================================================================
# Маршрутизация по таблице и операции
# ==============================================================================
try:
if table == "orders" and operation == "c":
send_order_notification(after)
elif table == "customers" and operation == "u":
update_search_index(after)
elif table == "products" and operation in ("c", "u"):
invalidate_product_cache(after)
else:
logging.info(f"No handler for table={table} op={operation}, skipping")
except Exception as e:
logging.error(f"Processing failed: {e}")
# Возврат 5xx → Pub/Sub повторит доставку
return f"Internal Server Error: {e}", 500
# Успешная обработка → 204 No Content
# Pub/Sub получит ACK и не будет повторять доставку
return ("", 204)
# ==============================================================================
# Бизнес-логика обработчиков
# ==============================================================================
def send_order_notification(order_data):
"""
Отправка уведомления о новом заказе.
Пример: отправить Slack сообщение в канал #orders
"""
customer_id = order_data.get("customer_id")
product_id = order_data.get("product_id")
quantity = order_data.get("quantity")
logging.info(f"Sending order notification: customer={customer_id}, product={product_id}, qty={quantity}")
# TODO: Интеграция с Slack API, SendGrid и т.д.
# import requests
# requests.post("https://hooks.slack.com/...", json={
# "text": f"Новый заказ: customer={customer_id}, product={product_id}"
# })
def update_search_index(customer_data):
"""
Обновление поискового индекса при изменении клиента.
Пример: обновить документ в Elasticsearch
"""
customer_id = customer_data.get("id")
name = customer_data.get("name")
email = customer_data.get("email")
logging.info(f"Updating search index: customer_id={customer_id}, name={name}")
# TODO: Интеграция с Elasticsearch
# from elasticsearch import Elasticsearch
# es = Elasticsearch("https://...")
# es.index(index="customers", id=customer_id, document=customer_data)
def invalidate_product_cache(product_data):
"""
Инвалидация Redis кэша при изменении товара.
Пример: удалить кэшированную цену товара
"""
product_id = product_data.get("id")
logging.info(f"Invalidating product cache: product_id={product_id}")
# TODO: Интеграция с Redis
# import redis
# r = redis.Redis(host="...")
# r.delete(f"product:{product_id}")
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", 8080))
app.run(host="0.0.0.0", port=port, debug=False)
Ключевые моменты:
- Декодирование: Pub/Sub отправляет данные в base64, нужно декодировать → JSON
- Идемпотентность: Pub/Sub может доставить сообщение повторно (at-least-once), логика должна быть идемпотентной
- Коды ответа:
204 No Content→ успешная обработка, Pub/Sub больше не повторит5xx→ ошибка, Pub/Sub повторит доставку с exponential backoff4xx→ невалидное сообщение, Pub/Sub отправит в Dead Letter Queue
Проверка знанийCloud Run обработчик CDC событий должен возвращать разные HTTP-коды в разных ситуациях. Что произойдёт, если обработчик вернёт 500 при временной ошибке внешнего API?
Dockerfile для Cloud Run
# Используем официальный Python runtime
FROM python:3.11-slim
# Рабочая директория
WORKDIR /app
# Копируем зависимости
COPY requirements.txt .
# Устанавливаем зависимости
RUN pip install --no-cache-dir -r requirements.txt
# Копируем код приложения
COPY main.py .
# Порт по умолчанию (Cloud Run устанавливает через $PORT)
ENV PORT=8080
# Запуск через gunicorn для production
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
requirements.txt
Flask==3.0.0
gunicorn==21.2.0
google-cloud-logging==3.8.0
Почему gunicorn?
- Flask dev server не подходит для production (однопоточный, нет graceful shutdown)
- gunicorn — production WSGI server с multi-threading
--workers 1 --threads 8— один процесс, 8 потоков (оптимально для Cloud Run concurrency)--timeout 0— отключаем таймаут (Cloud Run управляет таймаутами сам)
Деплой в Cloud Run
Шаг 1: Создать Service Account
# Создать сервисный аккаунт для Cloud Run сервиса
gcloud iam service-accounts create cdc-processor-sa \
--display-name="CDC Processor Service Account" \
--project=YOUR_PROJECT_ID
# Дать права на запись логов
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:cdc-processor-sa@YOUR_PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/logging.logWriter"
Шаг 2: Build и Push Docker образа
# Build и push в Google Container Registry
gcloud builds submit --tag gcr.io/YOUR_PROJECT_ID/cdc-processor
# Альтернатива: использовать Artifact Registry (recommended)
# gcloud builds submit --tag us-central1-docker.pkg.dev/YOUR_PROJECT_ID/cdc/cdc-processor
Шаг 3: Deploy в Cloud Run
gcloud run deploy cdc-processor \
--image gcr.io/YOUR_PROJECT_ID/cdc-processor \
--region us-central1 \
--service-account=cdc-processor-sa@YOUR_PROJECT_ID.iam.gserviceaccount.com \
--no-allow-unauthenticated \
--min-instances=0 \
--max-instances=10 \
--concurrency=80 \
--memory=512Mi \
--cpu=1
Объяснение параметров:
| Параметр | Значение | Назначение |
|---|---|---|
--no-allow-unauthenticated | - | Только аутентифицированные запросы (Eventarc будет использовать service account) |
--min-instances=0 | 0 | Scale to zero при отсутствии трафика (экономия) |
--max-instances=10 | 10 | Максимум 10 контейнеров параллельно |
--concurrency=80 | 80 | До 80 запросов на один контейнер |
--memory=512Mi | 512 MB | Достаточно для простой обработки |
--cpu=1 | 1 vCPU | Один виртуальный процессор |
Создание Eventarc Trigger
Eventarc связывает Pub/Sub топик с Cloud Run сервисом.
Шаг 1: Grant Eventarc Service Account права на Cloud Run
# Получить Eventarc service account
PROJECT_NUMBER=$(gcloud projects describe YOUR_PROJECT_ID --format="value(projectNumber)")
EVENTARC_SA="service-${PROJECT_NUMBER}@gcp-sa-eventarc.iam.gserviceaccount.com"
# Дать права на вызов Cloud Run
gcloud run services add-iam-policy-binding cdc-processor \
--region=us-central1 \
--member="serviceAccount:${EVENTARC_SA}" \
--role="roles/run.invoker"
Шаг 2: Создать Eventarc Trigger
gcloud eventarc triggers create cdc-order-processor \
--location=us-central1 \
--destination-run-service=cdc-processor \
--destination-run-region=us-central1 \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
--transport-topic=projects/YOUR_PROJECT_ID/topics/cdc.public.orders \
--service-account=cdc-processor-sa@YOUR_PROJECT_ID.iam.gserviceaccount.com
Объяснение:
--event-filters— тип события:google.cloud.pubsub.topic.v1.messagePublished--transport-topic— Pub/Sub топик для мониторинга (должен существовать)- Eventarc автоматически создает Pub/Sub subscription для этого топика
Шаг 3: Проверить создание триггера
gcloud eventarc triggers list --location=us-central1
Вывод:
NAME TYPE DESTINATION_RUN_SERVICE
cdc-order-processor google.cloud.pubsub.topic.v1.messagePublished cdc-processor
Обработка нескольких таблиц
Есть два подхода:
Вариант 1: Отдельные триггеры на каждый топик (Recommended)
Преимущества:
- Независимое масштабирование для каждой таблицы
- Изоляция ошибок (падение обработчика одной таблицы не влияет на другие)
- Простая конфигурация
# Триггер для orders
gcloud eventarc triggers create cdc-order-processor \
--location=us-central1 \
--destination-run-service=order-handler \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
--transport-topic=projects/YOUR_PROJECT_ID/topics/cdc.public.orders
# Триггер для customers
gcloud eventarc triggers create cdc-customer-processor \
--location=us-central1 \
--destination-run-service=customer-handler \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
--transport-topic=projects/YOUR_PROJECT_ID/topics/cdc.public.customers
Вариант 2: Единый обработчик с роутингом по таблице
Преимущества:
- Меньше Cloud Run сервисов
- Общая логика обработки
Недостатки:
- Все события идут через один сервис (сложнее масштабировать)
Пример роутинга уже показан в main.py выше (проверка table == "orders").
Error Handling и Retries
Pub/Sub Retry Behavior
| HTTP код | Pub/Sub поведение |
|---|---|
200-299 | Успех → ACK сообщения |
400-499 | Невалидное сообщение → отправить в Dead Letter Queue |
500-599 | Временная ошибка → retry с exponential backoff |
Конфигурация Dead Letter Queue
# 1. Создать DLQ топик
gcloud pubsub topics create cdc-dead-letter
# 2. Создать subscription с DLQ
gcloud pubsub subscriptions create cdc.public.orders-sub \
--topic=cdc.public.orders \
--dead-letter-topic=cdc-dead-letter \
--max-delivery-attempts=5
Параметры retry:
--max-delivery-attempts=5— максимум 5 попыток доставки- После 5 попыток → сообщение отправляется в DLQ
- Exponential backoff: 10s, 20s, 40s, 80s, 160s
Designing Idempotent Handlers
Проблема: Pub/Sub может доставить одно и то же сообщение дважды.
Решение: Использовать уникальный ID события для дедупликации.
def handle_cdc_event():
envelope = request.get_json()
pubsub_message = envelope["message"]
# Уникальный ID сообщения от Pub/Sub
message_id = pubsub_message.get("messageId")
# Проверить, обрабатывали ли уже это сообщение
if is_already_processed(message_id):
logging.info(f"Duplicate message {message_id}, skipping")
return ("", 204)
# Обработать событие
process_event(...)
# Сохранить message_id в БД/Redis для дедупликации
mark_as_processed(message_id)
return ("", 204)
def is_already_processed(message_id):
# TODO: Проверить в Redis/Firestore
# import redis
# r = redis.Redis(...)
# return r.exists(f"processed:{message_id}")
return False
def mark_as_processed(message_id):
# TODO: Сохранить в Redis с TTL (например, 7 дней)
# r.setex(f"processed:{message_id}", 604800, "1")
pass
Практические примеры
Пример 1: Отправка Slack уведомления при новом заказе
import requests
def send_order_notification(order_data):
"""Отправить Slack сообщение о новом заказе."""
slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
customer_id = order_data.get("customer_id")
product_id = order_data.get("product_id")
quantity = order_data.get("quantity")
message = {
"text": f":shopping_cart: Новый заказ!",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Новый заказ создан*\n• Customer: {customer_id}\n• Product: {product_id}\n• Quantity: {quantity}"
}
}
]
}
response = requests.post(slack_webhook_url, json=message)
response.raise_for_status()
Пример 2: Обновление Elasticsearch индекса
from elasticsearch import Elasticsearch
es = Elasticsearch("https://elasticsearch.example.com")
def update_search_index(customer_data):
"""Обновить поисковый индекс при изменении клиента."""
customer_id = customer_data.get("id")
# Upsert документа в Elasticsearch
es.index(
index="customers",
id=customer_id,
document={
"name": customer_data.get("name"),
"email": customer_data.get("email"),
"updated_at": customer_data.get("updated_at")
}
)
logging.info(f"Updated Elasticsearch: customer_id={customer_id}")
Пример 3: Инвалидация Redis кэша
import redis
r = redis.Redis(host="redis.example.com", port=6379, decode_responses=True)
def invalidate_product_cache(product_data):
"""Удалить кэшированную информацию о товаре."""
product_id = product_data.get("id")
# Удалить все ключи, связанные с этим товаром
keys_to_delete = [
f"product:{product_id}",
f"product:{product_id}:price",
f"product:{product_id}:inventory"
]
r.delete(*keys_to_delete)
logging.info(f"Invalidated cache for product_id={product_id}")
Scaling и Concurrency
Auto-Scaling
Cloud Run автоматически масштабируется на основе:
- Количества запросов
- CPU утилизации
- Concurrency (запросов на контейнер)
Cloud Run масштабируется на основе concurrency
Cloud Run масштабируется автоматически на основе:
- • Concurrency: requests на контейнер (default: 80)
- • CPU/Memory utilization (если указаны в конфигурации)
Настройка Concurrency
gcloud run deploy cdc-processor \
--concurrency=80 \
--min-instances=0 \
--max-instances=10
Рекомендации:
| Тип обработки | Concurrency | Min Instances |
|---|---|---|
| I/O bound (вызовы API, БД) | 80-100 | 0 (scale to zero) |
| CPU bound (трансформации, парсинг) | 10-20 | 1 (warm start) |
| Latency-sensitive (< 100ms) | 50 | 1-2 (избежать cold start) |
Cold Start Considerations
Проблема: При scale to zero первый запрос после idle может занять 1-3 секунды (cold start).
Проверка знанийCloud Run с min-instances=0 масштабируется до нуля при отсутствии трафика. Какой компромисс это создаёт для CDC event processing и когда стоит использовать min-instances=1?
Решения:
- min-instances=1 — держать один контейнер постоянно (стоимость ~$10/месяц)
- Warmup requests — периодически отправлять dummy запросы через Cloud Scheduler
- Принять latency — для low-traffic таблиц cold start допустим
# Держать 1 контейнер постоянно для hot tables
gcloud run deploy order-processor \
--min-instances=1
# Scale to zero для редких событий
gcloud run deploy customer-processor \
--min-instances=0
Sequence Diagram: End-to-End Flow
Полный цикл от INSERT в Cloud SQL до вызова external API
Latency breakdown (typical):
- • Cloud SQL → Debezium: 100-500ms (WAL polling interval)
- • Debezium → Pub/Sub: 10-50ms (network latency)
- • Pub/Sub → Cloud Run: 50-200ms (cold start если scale from zero)
- • Cloud Run → External API: depends on API (50-500ms)
- • Total: ~500-2000ms end-to-end
Что мы узнали
- Cloud Run для event-driven CDC: Выполнение кастомной логики для каждого события
- Eventarc триггеры: Связывание Pub/Sub топиков с Cloud Run сервисами
- Flask handler: Обработка Pub/Sub сообщений в формате base64-encoded JSON
- Error handling: 204 для успеха, 5xx для retry, Dead Letter Queue для poison messages
- Идемпотентность: Использование message_id для дедупликации
- Auto-scaling: Cloud Run масштабируется на основе concurrency и CPU
Что дальше?
В следующем уроке мы настроим end-to-end мониторинг всего CDC pipeline: Cloud SQL, Debezium, Pub/Sub, Dataflow, Cloud Run — все в одном dashboard.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс