Learning Platform
Глоссарий Troubleshooting
Урок 17.04 · 25 мин
Средний
CI/CDSavepointMiniClusterIntegration TestingGitOpsRollback

CI/CD для Flink: savepoint-driven deployment и integration testing

Flink-джоб без CI/CD = ручной деплой = ошибки в production. Этот урок про то, как построить полноценный CI/CD для Flink: версионирование, integration tests на MiniCluster, savepoint-driven deployment через Flink K8s Operator, automated rollback при провале. После урока у вас будет шаблон для своего pipeline.


Что такое savepoint-driven deployment

Обычный CI/CD для веб-сервиса: build image -> push -> kubectl set image -> rolling restart. Pod-ы пересоздаются с новым кодом, всё работает.

Для Flink этот подход НЕ работает. Pod без сохранённого состояния = потеря state, потеря всех агрегаций. Нужно: остановить джоб -> savepoint -> новый pod -> restore from savepoint.

Savepoint-driven deployment — это паттерн, где каждый deploy:

Blue/Green и Canary в Kubernetes: deployment стратегии без downtime
  1. Триггерит savepoint текущего джоба.
  2. Регистрирует путь savepoint-а в external metadata store.
  3. Деплоит новую версию с явным initialSavepointPath (для первого rollback fallback) или через upgradeMode: savepoint.
  4. Если deploy провалился — автоматический rollback через тот же savepoint и предыдущий image.

Это нативно поддерживается Flink Kubernetes Operator (см. модуль 15 урок 2). Наша задача в CI/CD — обвязать это вокруг git push.


Тэги в git, версии в Maven/Gradle, теги Docker image — должны быть синхронизированы. Один git tag = одна Maven version = один Docker image tag.

Структура репозитория:

flink-orders-pipeline/
├── src/
│   ├── main/java/com/acme/flink/OrdersPipeline.java
│   └── test/java/...
├── deploy/
│   ├── flinkdeployment-prod.yaml
│   ├── flinkdeployment-stage.yaml
│   └── kustomization.yaml
├── pom.xml                      # version = ${revision}
├── Dockerfile
└── .github/workflows/
    ├── ci.yaml                  # build + test on PR
    └── cd.yaml                  # build + deploy on tag

В pom.xml версия динамическая через ${revision} — CI устанавливает её из git tag:

<project>
  <groupId>com.acme</groupId>
  <artifactId>orders-pipeline</artifactId>
  <version>${revision}</version>
  <properties>
    <revision>0.0.0-SNAPSHOT</revision>
  </properties>
</project>

В CI:

mvn -Drevision=$GIT_TAG package

В Dockerfile тэг — тот же:

FROM flink:2.2.0-java17
COPY target/orders-pipeline-*.jar /opt/flink/usrlib/orders-pipeline.jar
docker build -t my-registry/orders-pipeline:$GIT_TAG .
docker push my-registry/orders-pipeline:$GIT_TAG

В flinkdeployment-prod.yaml:

spec:
  image: my-registry/orders-pipeline:${IMAGE_TAG}

IMAGE_TAG подставляется через kustomize или envsubst.

TIP

Используйте semantic versioning: 0.1.0, 0.1.1, 0.2.0. Major = breaking changes (state schema incompatible). Minor = новые фичи backward-compatible. Patch = bug fix. Git tag = v0.1.1, Maven version = 0.1.1, Docker image = orders-pipeline:0.1.1.


Unit-тесты на отдельные функции — это обязательно, но недостаточно. Реальные баги (schema mismatch, watermark issues, state corruption) видны только при integration test всего пайплайна. Flink имеет встроенный MiniCluster для этого.

MiniCluster — это in-process Flink cluster: JM, TM, всё в одной JVM. Можно поднять его в JUnit тесте за 1-2 секунды и прогнать настоящий джоб с настоящими источниками (in-memory Kafka, generic source).

import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class OrdersPipelineIntegrationTest {

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(
        new MiniClusterExtensionConfiguration.Builder()
            .setConfiguration(new Configuration())
            .setNumberSlotsPerTaskManager(2)
            .setNumberTaskManagers(2)
            .build()
    );

    @Test
    void testEnrichmentPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Generic source: in-memory data
        DataStream<Order> orders = env.fromCollection(testOrders());

        // Our pipeline
        DataStream<EnrichedOrder> enriched = OrdersPipeline
            .applyEnrichment(orders);

        // Collect results in-memory
        List<EnrichedOrder> results = new ArrayList<>();
        enriched.addSink(new SinkFunction<>() {
            @Override
            public void invoke(EnrichedOrder value, Context ctx) {
                results.add(value);
            }
        });

        env.execute("test-job");

        // Assertions
        assertEquals(3, results.size());
        assertEquals("VIP", results.get(0).customerSegment);
    }

    private static List<Order> testOrders() {
        return List.of(
            new Order(1L, "user-1", 100.0),
            new Order(2L, "user-2", 50.0),
            new Order(3L, "user-1", 200.0)
        );
    }
}

Maven зависимости:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>${flink.version}</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime</artifactId>
    <version>${flink.version}</version>
    <type>test-jar</type>
    <scope>test</scope>
</dependency>

Это интеграционный тест полного пайплайна без Kubernetes, без реального Kafka. Запускается в CI за секунды. Покрывает 80% багов перед production.


Testing с реальными внешними системами через Testcontainers

Для тестов Kafka source, CDC, JDBC sink используйте Testcontainers — это библиотека, которая поднимает реальные Kafka/Postgres/Redis в Docker, изолированно на тест:

@Testcontainers
class KafkaIntegrationTest {

    @Container
    static final KafkaContainer KAFKA = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
    );

    @Test
    void testKafkaSourceToSink() throws Exception {
        // Produce test events to Kafka
        Producer<String, String> producer = new KafkaProducer<>(...);
        producer.send(new ProducerRecord<>("input-topic", "key", "value"));

        // Run Flink job (with MiniCluster)
        StreamExecutionEnvironment env = ...;
        DataStream<Event> events = env.fromSource(
            KafkaSource.<Event>builder()
                .setBootstrapServers(KAFKA.getBootstrapServers())
                .setTopics("input-topic")
                .setValueOnlyDeserializer(new EventDeserializer())
                .build(),
            WatermarkStrategy.noWatermarks(),
            "kafka-source"
        );

        events.sinkTo(KafkaSink.<Event>builder()...build());
        env.executeAsync("integration-test");

        // Verify in output topic
        Consumer<String, String> consumer = new KafkaConsumer<>(...);
        consumer.subscribe(List.of("output-topic"));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
        assertFalse(records.isEmpty());
    }
}

Testcontainers + MiniCluster = realistic тест без deploy в реальный кластер. CI время: ~30-60 секунд на test class. Покрывает реальные edge-cases connector-ов.


CI workflow: build + test (на PR)

GitHub Actions / GitLab CI / Jenkins — выбор за вами. Принцип один.

# .github/workflows/ci.yaml
name: CI

on:
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Java
        uses: actions/setup-java@v4
        with:
          java-version: '17'
          distribution: 'temurin'

      - name: Cache Maven
        uses: actions/cache@v4
        with:
          path: ~/.m2
          key: m2-${{ hashFiles('**/pom.xml') }}

      - name: Run unit tests
        run: mvn -B test

      - name: Run integration tests with MiniCluster
        run: mvn -B verify -P integration

      - name: Lint UID audit
        run: |
          # Проверяем, что все stateful операторы имеют UID
          MISSING=$(grep -rE "\.process\(|\.window\(|\.fromSource\(|\.sinkTo\(" src/main/java | \
                    grep -v "\.uid(" || true)
          if [ -n "$MISSING" ]; then
            echo "Missing UIDs in:" && echo "$MISSING"
            exit 1
          fi

      - name: Check Avro schema compatibility
        run: |
          # Используем avro-tools для проверки совместимости с baseline schema
          mvn avro-compatibility-check

На каждом PR прогоняем: unit tests + integration tests (MiniCluster) + UID audit + Avro compatibility check. Без зелёного CI — нельзя merge.


CD workflow: build + deploy (на git tag)

# .github/workflows/cd.yaml
name: CD

on:
  push:
    tags: ['v*']

jobs:
  deploy-stage:
    runs-on: ubuntu-latest
    environment: stage
    steps:
      - uses: actions/checkout@v4

      - name: Extract version
        id: version
        run: echo "VERSION=${GITHUB_REF_NAME#v}" >> $GITHUB_OUTPUT

      - name: Build JAR
        run: mvn -B -Drevision=${{ steps.version.outputs.VERSION }} package -DskipTests

      - name: Build and push Docker
        run: |
          IMAGE=my-registry/orders-pipeline:${{ steps.version.outputs.VERSION }}
          docker build -t $IMAGE .
          docker push $IMAGE

      - name: Deploy to stage K8s
        run: |
          kubectl config use-context stage-k8s
          # kustomize set image
          cd deploy/stage
          kustomize edit set image my-registry/orders-pipeline:${{ steps.version.outputs.VERSION }}
          kubectl apply -k .

      - name: Wait for deploy
        run: |
          kubectl wait --for=condition=Ready flinkdeployment/orders-pipeline -n flink-stage --timeout=10m

      - name: Smoke test
        run: |
          # Curl REST API джоба, отправить тестовое событие в Kafka, проверить output
          ./scripts/smoke-test.sh

  deploy-prod:
    needs: deploy-stage
    runs-on: ubuntu-latest
    environment: prod   # требует manual approval в GitHub
    steps:
      - uses: actions/checkout@v4

      - name: Get current savepoint path (for rollback)
        run: |
          CURRENT_SP=$(kubectl get flinkdeployment orders-pipeline -n flink-prod \
            -o jsonpath='{.status.jobStatus.savepointInfo.lastSavepoint.location}')
          # Регистрируем путь в external store (S3 bucket с tags или DynamoDB)
          aws s3api put-object-tagging --bucket flink-rollback-registry \
            --key prod/orders-pipeline/${{ github.ref_name }} \
            --tagging "TagSet=[{Key=savepoint,Value=$CURRENT_SP}]"

      - name: Deploy to prod
        run: |
          kubectl config use-context prod-k8s
          cd deploy/prod
          kustomize edit set image my-registry/orders-pipeline:${{ github.ref_name }}
          kubectl apply -k .

      - name: Wait for STABLE
        run: |
          # Опрашиваем lifecycle state в течение 10 минут
          for i in {1..60}; do
            STATE=$(kubectl get flinkdeployment orders-pipeline -n flink-prod \
              -o jsonpath='{.status.lifecycleState}')
            if [ "$STATE" = "STABLE" ]; then
              echo "STABLE after $((i*10))s"
              exit 0
            fi
            sleep 10
          done
          echo "Failed to reach STABLE in 10m"
          exit 1

      - name: Smoke test prod
        run: ./scripts/smoke-test-prod.sh

Ключевые моменты:

  • Multi-stage: сначала stage, потом prod с manual approval (GitHub environments).
  • Versioning consistent: одна версия для Maven, Docker, kubectl manifest.
  • Pre-deploy rollback registration: записываем путь savepoint-а ДО deploy в external store, чтобы rollback-deploy знал, откуда восстанавливаться.
  • Smoke test после deploy: тестовое событие через source, проверка output. Catches catastrophic failures.

Автоматический rollback при провале

Что если smoke test провалился? Или job ушёл в ERROR через час? Нужен rollback workflow.

# .github/workflows/rollback.yaml
name: Rollback

on:
  workflow_dispatch:
    inputs:
      target_version:
        description: 'Версия для отката (например, v0.1.5)'
        required: true

jobs:
  rollback:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Get savepoint for target version
        run: |
          # Читаем savepoint path из rollback registry
          TARGET_SP=$(aws s3api get-object-tagging \
            --bucket flink-rollback-registry \
            --key prod/orders-pipeline/${{ inputs.target_version }} \
            --query 'TagSet[?Key==`savepoint`].Value' --output text)
          echo "TARGET_SP=$TARGET_SP" >> $GITHUB_ENV

      - name: Rollback deploy
        run: |
          kubectl config use-context prod-k8s
          # Patch FlinkDeployment с target image и initialSavepointPath
          kubectl patch flinkdeployment orders-pipeline -n flink-prod --type=merge -p "
          {
            \"spec\": {
              \"image\": \"my-registry/orders-pipeline:${{ inputs.target_version }}\",
              \"job\": {
                \"initialSavepointPath\": \"$TARGET_SP\",
                \"upgradeMode\": \"savepoint\"
              }
            }
          }"

      - name: Wait for STABLE
        run: |
          for i in {1..60}; do
            STATE=$(kubectl get flinkdeployment orders-pipeline -n flink-prod \
              -o jsonpath='{.status.lifecycleState}')
            [ "$STATE" = "STABLE" ] && exit 0
            sleep 10
          done
          exit 1

Запуск: gh workflow run rollback.yaml -f target_version=v0.1.5. Через 10 минут джоб вернётся к старой версии с восстановленным состоянием.


Schema migration tests: специальный класс integration-тестов

Самые опасные деплои — те, которые меняют схему state. Тестируем их специально:

@Test
void testRestoreFromPreviousVersionSavepoint() throws Exception {
    // 1. Локально (или через artifact из CI) есть savepoint от prev version
    Path prevSavepoint = Paths.get("test-resources/savepoints/v0.1.5/orders-pipeline-sp");

    // 2. Запускаем новую версию джоба с restore from this savepoint
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new EmbeddedRocksDBStateBackend());
    env.getCheckpointConfig().enableExternalizedCheckpoints(...);
    // ...

    // 3. Используем SavepointReader из State Processor API
    SavepointReader savepoint = SavepointReader.read(
        env, prevSavepoint.toString()
    );

    // 4. Проверяем, что state корректно загружается
    DataSet<UserState> userStates = savepoint
        .readKeyedState("enrichment-uid", new UserStateReader());

    long count = userStates.count();
    assertTrue(count > 0, "State should be non-empty after restore");
}

В CI: храните savepoint от последней prod-версии в git artifact (или artifacts bucket). Каждый PR прогоняет restore-test против него.


Ключевые выводы

  1. Версионирование: один git tag = одна Maven version = один Docker image tag. Используйте semantic versioning.

  2. Integration tests с MiniCluster покрывают 80% багов перед production. Запускаются за секунды в CI.

  3. Testcontainers для connectors: Kafka, Postgres в Docker для realistic integration tests.

  4. CI workflow на PR: unit + integration tests + UID audit + Avro compatibility check.

  5. CD workflow на git tag: build -> push image -> deploy to stage -> manual approval -> deploy to prod -> smoke test.

  6. Pre-deploy savepoint registration: записываем путь savepoint текущей prod-версии в external store ДО deploy. Это даёт чёткую точку отката.

  7. Rollback workflow: GitHub action с target_version, читает savepoint из registry, патчит FlinkDeployment с предыдущим image + initialSavepointPath.

  8. Schema migration tests: специальный класс тестов с restore из real savepoint предыдущей версии. Защищает от breaking changes стейта.

Проверка знанийKnowledge check
Команда настраивает CI/CD для нового Flink-джоба. У них уже есть GitHub Actions для unit tests. Что добавить, чтобы превратить это в полноценный production-ready pipeline для Flink, и в каком порядке приоритетов?
ОтветAnswer
Production-ready CI/CD для Flink требует 8-10 компонентов, добавлять в порядке убывания критичности: ПРИОРИТЕТ 1: Integration tests с MiniCluster (1-2 дня работы) Без этого все PR-ы катят непроверенный пайплайн. Самый высокий ROI. Используем flink-test-utils + MiniClusterExtension. Покрывает 80% багов: schema mismatch, watermark issues, операторы между собой. ПРИОРИТЕТ 2: UID audit + Avro compatibility check (полдня) Превентивная защита от состояния-катастроф. Простой grep-скрипт ловит missing UID-ы. avro-compatibility-checker против baseline schema. Дёшево, спасает от потери state. ПРИОРИТЕТ 3: Build pipeline image versioning (1 день) Git tag -> Maven version -> Docker image tag. Без этого каждый deploy путаница "какой код реально в проде". Semantic versioning, синхронные tags. ПРИОРИТЕТ 4: CD workflow на тег (2 дня) Workflow на push tag: build, push image, kubectl apply через kustomize. Сначала на stage с smoke test, потом на prod с manual approval (GitHub environments). ПРИОРИТЕТ 5: Pre-deploy savepoint registration (1 день) Перед каждым prod-deploy сохраняем savepoint path в external store (S3 bucket с tags, или DynamoDB). Это база для следующего шага - rollback. ПРИОРИТЕТ 6: Rollback workflow (1 день) Manually-triggered GitHub action: input - target version, читает savepoint из registry, patch FlinkDeployment с предыдущим image + initialSavepointPath. Тестировать на stage! ПРИОРИТЕТ 7: Testcontainers integration tests для connector-ов (2-3 дня) Realistic тесты Kafka source/sink, CDC, JDBC. Дольше CI (30-60 секунд), но ловят real connector bugs. ПРИОРИТЕТ 8: Schema migration tests (1 день) Хранение savepoint предыдущей prod-версии как test artifact. Каждый PR прогоняет restore-test через State Processor API. Защита от breaking changes стейта. ПРИОРИТЕТ 9: Automated smoke tests after deploy (1 день) Скрипт, который отправляет тестовое событие в источник, ждёт N секунд, проверяет output. Catastrophic failure detection. ПРИОРИТЕТ 10: Auto-rollback при failure smoke test (1 день) Связать smoke test с rollback workflow. При fail - automatically execute rollback workflow с предыдущей версией. Общее: 10-15 рабочих дней инвестиций, которые economят месяцы будущих инцидентов. Можно делать инкрементально - п.1-3 уже даёт 70% защиты.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда хочет добавить integration tests в CI для Flink-джоба. Они работают с Kafka source, JDBC sink в Postgres, lookup в Redis. Какой подход оптимален для CI (быстрый, repeatable, realistic)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5