Перейти к содержанию
Learning Platform
Средний
40 минут
Cloud Run Eventarc Pub/Sub Serverless Event-Driven

Требуемые знания:

  • module-6/02-debezium-server-pubsub
  • module-6/03-iam-workload-identity

Cloud Run для Event-Driven CDC

В предыдущих уроках мы настроили Debezium Server для публикации CDC событий в Pub/Sub, и использовали Dataflow для batch-репликации в BigQuery. Но что, если нужно выполнить кастомную бизнес-логику для каждого отдельного события?

Когда использовать Cloud Run для CDC

Cloud Run — это serverless платформа для запуска контейнеров, которая идеально подходит для event-driven обработки CDC событий.

Типичные Use Cases

Use CaseОписаниеПример
УведомленияОтправка email/Slack/SMS при создании записиНовый заказ → уведомление в отдел продаж
Обновление индексовСинхронизация с поисковыми системамиИзменение товара → обновление Elasticsearch
Инвалидация кэшаОчистка Redis/Memcached при изменении данныхОбновление цены → сброс кэша
ИнтеграцииВызов внешних API для каждого событияНовый клиент → создание в CRM
Обогащение данныхДобавление дополнительной информацииЗаказ → получение геолокации по IP

Cloud Run vs Dataflow

АспектCloud RunDataflow
Паттерн обработкиPer-event (каждое событие отдельно)Batch/Stream (агрегация, окна)
LatencyОчень низкая (миллисекунды)Средняя (секунды/минуты)
СтоимостьPay-per-request (низкая при малом трафике)Минимум 1 worker постоянно
Кастомная логикаПолная свобода (любой язык/библиотеки)Ограничена Apache Beam API
Use caseБизнес-логика, интеграции, уведомленияРепликация, аналитика, агрегация

Рекомендация: Используйте Cloud Run для реактивной бизнес-логики, Dataflow для репликации данных.


Архитектура: Pub/Sub → Eventarc → Cloud Run

Pub/Sub → Eventarc → Cloud Run

Serverless event-driven обработка CDC событий

Debezium
Server
CDC Events
Pub/Sub
Topic
Push subscription
Eventarc
Trigger
HTTP POST
Cloud Run
Service

Pub/Sub base64-кодирует данные в message.data — нужно декодировать → JSON

min-instances=0 позволяет scale to zero (cost optimization)

Поток событий:

  1. Cloud SQL генерирует изменения → Debezium Server захватывает через logical decoding
  2. Debezium Server публикует события в Pub/Sub топики (один топик на таблицу)
  3. Eventarc создает триггер: при новом сообщении в топике → вызвать Cloud Run
  4. Cloud Run обрабатывает событие и выполняет кастомную логику

Cloud Run Service: Python Flask Handler

Cloud Run сервис — это обычный HTTP-сервер, который принимает POST запросы с Pub/Sub сообщениями.

Структура проекта

cdc-processor/
├── main.py              # Flask приложение
├── requirements.txt     # Зависимости
├── Dockerfile          # Контейнер образ
└── .gcloudignore       # Игнорируемые файлы

main.py: Обработчик CDC событий

from flask import Flask, request
import base64
import json
import logging

# ==============================================================================
# Настройка логирования для Cloud Logging
# ==============================================================================
import google.cloud.logging
client = google.cloud.logging.Client()
client.setup_logging()

app = Flask(__name__)

@app.route("/", methods=["POST"])
def handle_cdc_event():
    """
    Обработчик Pub/Sub событий от Eventarc.

    Формат envelope:
    {
      "message": {
        "data": "base64-encoded Debezium event",
        "messageId": "...",
        "publishTime": "..."
      },
      "subscription": "..."
    }
    """
    envelope = request.get_json()

    # Валидация Pub/Sub сообщения
    if not envelope or "message" not in envelope:
        logging.error("Invalid Pub/Sub envelope: missing 'message' field")
        return "Bad Request: no Pub/Sub message received", 400

    pubsub_message = envelope["message"]

    # Декодирование Debezium события
    if "data" not in pubsub_message:
        logging.error("Invalid Pub/Sub message: no 'data' field")
        return "Bad Request: no data in message", 400

    try:
        # Декодирование base64 → UTF-8 → JSON
        data = base64.b64decode(pubsub_message["data"]).decode("utf-8")
        event = json.loads(data)
    except Exception as e:
        logging.error(f"Failed to decode Pub/Sub data: {e}")
        return "Bad Request: invalid data encoding", 400

    # ==============================================================================
    # Парсинг Debezium event структуры
    # ==============================================================================
    payload = event.get("payload", {})

    # Операция: c (create), u (update), d (delete), r (read/snapshot)
    operation = payload.get("op")

    # Данные до/после изменения
    before = payload.get("before")  # null для INSERT
    after = payload.get("after")    # null для DELETE

    # Метаданные источника
    source = payload.get("source", {})
    table = source.get("table")
    timestamp_ms = source.get("ts_ms")

    logging.info(f"Received CDC event: table={table}, op={operation}, ts={timestamp_ms}")

    # ==============================================================================
    # Маршрутизация по таблице и операции
    # ==============================================================================
    try:
        if table == "orders" and operation == "c":
            send_order_notification(after)
        elif table == "customers" and operation == "u":
            update_search_index(after)
        elif table == "products" and operation in ("c", "u"):
            invalidate_product_cache(after)
        else:
            logging.info(f"No handler for table={table} op={operation}, skipping")

    except Exception as e:
        logging.error(f"Processing failed: {e}")
        # Возврат 5xx → Pub/Sub повторит доставку
        return f"Internal Server Error: {e}", 500

    # Успешная обработка → 204 No Content
    # Pub/Sub получит ACK и не будет повторять доставку
    return ("", 204)


# ==============================================================================
# Бизнес-логика обработчиков
# ==============================================================================

def send_order_notification(order_data):
    """
    Отправка уведомления о новом заказе.

    Пример: отправить Slack сообщение в канал #orders
    """
    customer_id = order_data.get("customer_id")
    product_id = order_data.get("product_id")
    quantity = order_data.get("quantity")

    logging.info(f"Sending order notification: customer={customer_id}, product={product_id}, qty={quantity}")

    # TODO: Интеграция с Slack API, SendGrid и т.д.
    # import requests
    # requests.post("https://hooks.slack.com/...", json={
    #     "text": f"Новый заказ: customer={customer_id}, product={product_id}"
    # })


def update_search_index(customer_data):
    """
    Обновление поискового индекса при изменении клиента.

    Пример: обновить документ в Elasticsearch
    """
    customer_id = customer_data.get("id")
    name = customer_data.get("name")
    email = customer_data.get("email")

    logging.info(f"Updating search index: customer_id={customer_id}, name={name}")

    # TODO: Интеграция с Elasticsearch
    # from elasticsearch import Elasticsearch
    # es = Elasticsearch("https://...")
    # es.index(index="customers", id=customer_id, document=customer_data)


def invalidate_product_cache(product_data):
    """
    Инвалидация Redis кэша при изменении товара.

    Пример: удалить кэшированную цену товара
    """
    product_id = product_data.get("id")

    logging.info(f"Invalidating product cache: product_id={product_id}")

    # TODO: Интеграция с Redis
    # import redis
    # r = redis.Redis(host="...")
    # r.delete(f"product:{product_id}")


if __name__ == "__main__":
    import os
    port = int(os.environ.get("PORT", 8080))
    app.run(host="0.0.0.0", port=port, debug=False)

Ключевые моменты:

  1. Декодирование: Pub/Sub отправляет данные в base64, нужно декодировать → JSON
  2. Идемпотентность: Pub/Sub может доставить сообщение повторно (at-least-once), логика должна быть идемпотентной
  3. Коды ответа:
    • 204 No Content → успешная обработка, Pub/Sub больше не повторит
    • 5xx → ошибка, Pub/Sub повторит доставку с exponential backoff
    • 4xx → невалидное сообщение, Pub/Sub отправит в Dead Letter Queue
Проверка знаний
Cloud Run обработчик CDC событий должен возвращать разные HTTP-коды в разных ситуациях. Что произойдёт, если обработчик вернёт 500 при временной ошибке внешнего API?
Ответ
Pub/Sub интерпретирует 5xx как временную ошибку и повторит доставку сообщения с exponential backoff (10s, 20s, 40s...). После исчерпания max-delivery-attempts (например, 5 попыток) сообщение отправляется в Dead Letter Queue. Поэтому обработчик должен быть идемпотентным — одно и то же событие может быть обработано несколько раз. Для успешной обработки нужно вернуть 2xx (обычно 204), для невалидных данных — 4xx.

Dockerfile для Cloud Run

# Используем официальный Python runtime
FROM python:3.11-slim

# Рабочая директория
WORKDIR /app

# Копируем зависимости
COPY requirements.txt .

# Устанавливаем зависимости
RUN pip install --no-cache-dir -r requirements.txt

# Копируем код приложения
COPY main.py .

# Порт по умолчанию (Cloud Run устанавливает через $PORT)
ENV PORT=8080

# Запуск через gunicorn для production
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app

requirements.txt

Flask==3.0.0
gunicorn==21.2.0
google-cloud-logging==3.8.0

Почему gunicorn?

  • Flask dev server не подходит для production (однопоточный, нет graceful shutdown)
  • gunicorn — production WSGI server с multi-threading
  • --workers 1 --threads 8 — один процесс, 8 потоков (оптимально для Cloud Run concurrency)
  • --timeout 0 — отключаем таймаут (Cloud Run управляет таймаутами сам)

Деплой в Cloud Run

Шаг 1: Создать Service Account

# Создать сервисный аккаунт для Cloud Run сервиса
gcloud iam service-accounts create cdc-processor-sa \
  --display-name="CDC Processor Service Account" \
  --project=YOUR_PROJECT_ID

# Дать права на запись логов
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
  --member="serviceAccount:cdc-processor-sa@YOUR_PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/logging.logWriter"

Шаг 2: Build и Push Docker образа

# Build и push в Google Container Registry
gcloud builds submit --tag gcr.io/YOUR_PROJECT_ID/cdc-processor

# Альтернатива: использовать Artifact Registry (recommended)
# gcloud builds submit --tag us-central1-docker.pkg.dev/YOUR_PROJECT_ID/cdc/cdc-processor

Шаг 3: Deploy в Cloud Run

gcloud run deploy cdc-processor \
  --image gcr.io/YOUR_PROJECT_ID/cdc-processor \
  --region us-central1 \
  --service-account=cdc-processor-sa@YOUR_PROJECT_ID.iam.gserviceaccount.com \
  --no-allow-unauthenticated \
  --min-instances=0 \
  --max-instances=10 \
  --concurrency=80 \
  --memory=512Mi \
  --cpu=1

Объяснение параметров:

ПараметрЗначениеНазначение
--no-allow-unauthenticated-Только аутентифицированные запросы (Eventarc будет использовать service account)
--min-instances=00Scale to zero при отсутствии трафика (экономия)
--max-instances=1010Максимум 10 контейнеров параллельно
--concurrency=8080До 80 запросов на один контейнер
--memory=512Mi512 MBДостаточно для простой обработки
--cpu=11 vCPUОдин виртуальный процессор

Создание Eventarc Trigger

Eventarc связывает Pub/Sub топик с Cloud Run сервисом.

Шаг 1: Grant Eventarc Service Account права на Cloud Run

# Получить Eventarc service account
PROJECT_NUMBER=$(gcloud projects describe YOUR_PROJECT_ID --format="value(projectNumber)")
EVENTARC_SA="service-${PROJECT_NUMBER}@gcp-sa-eventarc.iam.gserviceaccount.com"

# Дать права на вызов Cloud Run
gcloud run services add-iam-policy-binding cdc-processor \
  --region=us-central1 \
  --member="serviceAccount:${EVENTARC_SA}" \
  --role="roles/run.invoker"

Шаг 2: Создать Eventarc Trigger

gcloud eventarc triggers create cdc-order-processor \
  --location=us-central1 \
  --destination-run-service=cdc-processor \
  --destination-run-region=us-central1 \
  --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
  --transport-topic=projects/YOUR_PROJECT_ID/topics/cdc.public.orders \
  --service-account=cdc-processor-sa@YOUR_PROJECT_ID.iam.gserviceaccount.com

Объяснение:

  • --event-filters — тип события: google.cloud.pubsub.topic.v1.messagePublished
  • --transport-topic — Pub/Sub топик для мониторинга (должен существовать)
  • Eventarc автоматически создает Pub/Sub subscription для этого топика

Шаг 3: Проверить создание триггера

gcloud eventarc triggers list --location=us-central1

Вывод:

NAME                  TYPE                                              DESTINATION_RUN_SERVICE
cdc-order-processor   google.cloud.pubsub.topic.v1.messagePublished     cdc-processor

Обработка нескольких таблиц

Есть два подхода:

Преимущества:

  • Независимое масштабирование для каждой таблицы
  • Изоляция ошибок (падение обработчика одной таблицы не влияет на другие)
  • Простая конфигурация
# Триггер для orders
gcloud eventarc triggers create cdc-order-processor \
  --location=us-central1 \
  --destination-run-service=order-handler \
  --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
  --transport-topic=projects/YOUR_PROJECT_ID/topics/cdc.public.orders

# Триггер для customers
gcloud eventarc triggers create cdc-customer-processor \
  --location=us-central1 \
  --destination-run-service=customer-handler \
  --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
  --transport-topic=projects/YOUR_PROJECT_ID/topics/cdc.public.customers

Вариант 2: Единый обработчик с роутингом по таблице

Преимущества:

  • Меньше Cloud Run сервисов
  • Общая логика обработки

Недостатки:

  • Все события идут через один сервис (сложнее масштабировать)

Пример роутинга уже показан в main.py выше (проверка table == "orders").


Error Handling и Retries

Pub/Sub Retry Behavior

HTTP кодPub/Sub поведение
200-299Успех → ACK сообщения
400-499Невалидное сообщение → отправить в Dead Letter Queue
500-599Временная ошибка → retry с exponential backoff

Конфигурация Dead Letter Queue

# 1. Создать DLQ топик
gcloud pubsub topics create cdc-dead-letter

# 2. Создать subscription с DLQ
gcloud pubsub subscriptions create cdc.public.orders-sub \
  --topic=cdc.public.orders \
  --dead-letter-topic=cdc-dead-letter \
  --max-delivery-attempts=5

Параметры retry:

  • --max-delivery-attempts=5 — максимум 5 попыток доставки
  • После 5 попыток → сообщение отправляется в DLQ
  • Exponential backoff: 10s, 20s, 40s, 80s, 160s

Designing Idempotent Handlers

Проблема: Pub/Sub может доставить одно и то же сообщение дважды.

Решение: Использовать уникальный ID события для дедупликации.

def handle_cdc_event():
    envelope = request.get_json()
    pubsub_message = envelope["message"]

    # Уникальный ID сообщения от Pub/Sub
    message_id = pubsub_message.get("messageId")

    # Проверить, обрабатывали ли уже это сообщение
    if is_already_processed(message_id):
        logging.info(f"Duplicate message {message_id}, skipping")
        return ("", 204)

    # Обработать событие
    process_event(...)

    # Сохранить message_id в БД/Redis для дедупликации
    mark_as_processed(message_id)

    return ("", 204)


def is_already_processed(message_id):
    # TODO: Проверить в Redis/Firestore
    # import redis
    # r = redis.Redis(...)
    # return r.exists(f"processed:{message_id}")
    return False


def mark_as_processed(message_id):
    # TODO: Сохранить в Redis с TTL (например, 7 дней)
    # r.setex(f"processed:{message_id}", 604800, "1")
    pass

Практические примеры

Пример 1: Отправка Slack уведомления при новом заказе

import requests

def send_order_notification(order_data):
    """Отправить Slack сообщение о новом заказе."""
    slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

    customer_id = order_data.get("customer_id")
    product_id = order_data.get("product_id")
    quantity = order_data.get("quantity")

    message = {
        "text": f":shopping_cart: Новый заказ!",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Новый заказ создан*\n• Customer: {customer_id}\n• Product: {product_id}\n• Quantity: {quantity}"
                }
            }
        ]
    }

    response = requests.post(slack_webhook_url, json=message)
    response.raise_for_status()

Пример 2: Обновление Elasticsearch индекса

from elasticsearch import Elasticsearch

es = Elasticsearch("https://elasticsearch.example.com")

def update_search_index(customer_data):
    """Обновить поисковый индекс при изменении клиента."""
    customer_id = customer_data.get("id")

    # Upsert документа в Elasticsearch
    es.index(
        index="customers",
        id=customer_id,
        document={
            "name": customer_data.get("name"),
            "email": customer_data.get("email"),
            "updated_at": customer_data.get("updated_at")
        }
    )

    logging.info(f"Updated Elasticsearch: customer_id={customer_id}")

Пример 3: Инвалидация Redis кэша

import redis

r = redis.Redis(host="redis.example.com", port=6379, decode_responses=True)

def invalidate_product_cache(product_data):
    """Удалить кэшированную информацию о товаре."""
    product_id = product_data.get("id")

    # Удалить все ключи, связанные с этим товаром
    keys_to_delete = [
        f"product:{product_id}",
        f"product:{product_id}:price",
        f"product:{product_id}:inventory"
    ]

    r.delete(*keys_to_delete)
    logging.info(f"Invalidated cache for product_id={product_id}")

Scaling и Concurrency

Auto-Scaling

Cloud Run автоматически масштабируется на основе:

  • Количества запросов
  • CPU утилизации
  • Concurrency (запросов на контейнер)
Auto-Scaling Behavior

Cloud Run масштабируется на основе concurrency

Pub/Sub Messages
Cloud Run Instance 1
Cloud Run Instance 2
Cloud Run Instance 3
50 messages100 messages (overload)Scale up (new instance)200 messages200 messagesScale up (3rd instance)

Cloud Run масштабируется автоматически на основе:

  • • Concurrency: requests на контейнер (default: 80)
  • • CPU/Memory utilization (если указаны в конфигурации)

Настройка Concurrency

gcloud run deploy cdc-processor \
  --concurrency=80 \
  --min-instances=0 \
  --max-instances=10

Рекомендации:

Тип обработкиConcurrencyMin Instances
I/O bound (вызовы API, БД)80-1000 (scale to zero)
CPU bound (трансформации, парсинг)10-201 (warm start)
Latency-sensitive (< 100ms)501-2 (избежать cold start)

Cold Start Considerations

Проблема: При scale to zero первый запрос после idle может занять 1-3 секунды (cold start).

Проверка знаний
Cloud Run с min-instances=0 масштабируется до нуля при отсутствии трафика. Какой компромисс это создаёт для CDC event processing и когда стоит использовать min-instances=1?
Ответ
min-instances=0 экономит деньги (pay-per-request), но создаёт cold start задержку 1-3 секунды для первого события после idle. Для CDC это означает: первое событие после периода тишины обрабатывается с задержкой. min-instances=1 (~$10/мес) устраняет cold start, но платите за постоянно работающий контейнер. Используйте min-instances=1 для high-traffic таблиц или latency-sensitive обработки, и min-instances=0 для редких событий, где задержка допустима.

Решения:

  1. min-instances=1 — держать один контейнер постоянно (стоимость ~$10/месяц)
  2. Warmup requests — периодически отправлять dummy запросы через Cloud Scheduler
  3. Принять latency — для low-traffic таблиц cold start допустим
# Держать 1 контейнер постоянно для hot tables
gcloud run deploy order-processor \
  --min-instances=1

# Scale to zero для редких событий
gcloud run deploy customer-processor \
  --min-instances=0

Sequence Diagram: End-to-End Flow

End-to-End Event Processing

Полный цикл от INSERT в Cloud SQL до вызова external API

Cloud SQL
Debezium Server
Pub/Sub
Eventarc
Cloud Run
External API
INSERT orderWAL eventCDC event (JSON)Push messageHTTP POST (base64 data)Decode & ProcessAPI call (customer notification)Success responseHTTP 200 OK

Latency breakdown (typical):

  • • Cloud SQL → Debezium: 100-500ms (WAL polling interval)
  • • Debezium → Pub/Sub: 10-50ms (network latency)
  • • Pub/Sub → Cloud Run: 50-200ms (cold start если scale from zero)
  • • Cloud Run → External API: depends on API (50-500ms)
  • Total: ~500-2000ms end-to-end

Что мы узнали

  1. Cloud Run для event-driven CDC: Выполнение кастомной логики для каждого события
  2. Eventarc триггеры: Связывание Pub/Sub топиков с Cloud Run сервисами
  3. Flask handler: Обработка Pub/Sub сообщений в формате base64-encoded JSON
  4. Error handling: 204 для успеха, 5xx для retry, Dead Letter Queue для poison messages
  5. Идемпотентность: Использование message_id для дедупликации
  6. Auto-scaling: Cloud Run масштабируется на основе concurrency и CPU

Что дальше?

В следующем уроке мы настроим end-to-end мониторинг всего CDC pipeline: Cloud SQL, Debezium, Pub/Sub, Dataflow, Cloud Run — все в одном dashboard.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Cloud Run обработчик CDC событий возвращает HTTP 204 при успешной обработке и HTTP 500 при ошибке. Что произойдет с Pub/Sub сообщением в каждом случае?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс