CI/CD для Flink: savepoint-driven deployment и integration testing
Flink-джоб без CI/CD = ручной деплой = ошибки в production. Этот урок про то, как построить полноценный CI/CD для Flink: версионирование, integration tests на MiniCluster, savepoint-driven deployment через Flink K8s Operator, automated rollback при провале. После урока у вас будет шаблон для своего pipeline.
Что такое savepoint-driven deployment
Обычный CI/CD для веб-сервиса: build image -> push -> kubectl set image -> rolling restart. Pod-ы пересоздаются с новым кодом, всё работает.
Для Flink этот подход НЕ работает. Pod без сохранённого состояния = потеря state, потеря всех агрегаций. Нужно: остановить джоб -> savepoint -> новый pod -> restore from savepoint.
Savepoint-driven deployment — это паттерн, где каждый deploy:
Blue/Green и Canary в Kubernetes: deployment стратегии без downtime- Триггерит savepoint текущего джоба.
- Регистрирует путь savepoint-а в external metadata store.
- Деплоит новую версию с явным
initialSavepointPath(для первого rollback fallback) или черезupgradeMode: savepoint. - Если deploy провалился — автоматический rollback через тот же savepoint и предыдущий image.
Это нативно поддерживается Flink Kubernetes Operator (см. модуль 15 урок 2). Наша задача в CI/CD — обвязать это вокруг git push.
Версионирование Flink-джоба
Тэги в git, версии в Maven/Gradle, теги Docker image — должны быть синхронизированы. Один git tag = одна Maven version = один Docker image tag.
Структура репозитория:
flink-orders-pipeline/
├── src/
│ ├── main/java/com/acme/flink/OrdersPipeline.java
│ └── test/java/...
├── deploy/
│ ├── flinkdeployment-prod.yaml
│ ├── flinkdeployment-stage.yaml
│ └── kustomization.yaml
├── pom.xml # version = ${revision}
├── Dockerfile
└── .github/workflows/
├── ci.yaml # build + test on PR
└── cd.yaml # build + deploy on tag
В pom.xml версия динамическая через ${revision} — CI устанавливает её из git tag:
<project>
<groupId>com.acme</groupId>
<artifactId>orders-pipeline</artifactId>
<version>${revision}</version>
<properties>
<revision>0.0.0-SNAPSHOT</revision>
</properties>
</project>
В CI:
mvn -Drevision=$GIT_TAG package
В Dockerfile тэг — тот же:
FROM flink:2.2.0-java17
COPY target/orders-pipeline-*.jar /opt/flink/usrlib/orders-pipeline.jar
docker build -t my-registry/orders-pipeline:$GIT_TAG .
docker push my-registry/orders-pipeline:$GIT_TAG
В flinkdeployment-prod.yaml:
spec:
image: my-registry/orders-pipeline:${IMAGE_TAG}
IMAGE_TAG подставляется через kustomize или envsubst.
Используйте semantic versioning: 0.1.0, 0.1.1, 0.2.0. Major = breaking changes (state schema incompatible). Minor = новые фичи backward-compatible. Patch = bug fix. Git tag = v0.1.1, Maven version = 0.1.1, Docker image = orders-pipeline:0.1.1.
Integration testing с Flink MiniCluster
Unit-тесты на отдельные функции — это обязательно, но недостаточно. Реальные баги (schema mismatch, watermark issues, state corruption) видны только при integration test всего пайплайна. Flink имеет встроенный MiniCluster для этого.
MiniCluster — это in-process Flink cluster: JM, TM, всё в одной JVM. Можно поднять его в JUnit тесте за 1-2 секунды и прогнать настоящий джоб с настоящими источниками (in-memory Kafka, generic source).
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class OrdersPipelineIntegrationTest {
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(
new MiniClusterExtensionConfiguration.Builder()
.setConfiguration(new Configuration())
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(2)
.build()
);
@Test
void testEnrichmentPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Generic source: in-memory data
DataStream<Order> orders = env.fromCollection(testOrders());
// Our pipeline
DataStream<EnrichedOrder> enriched = OrdersPipeline
.applyEnrichment(orders);
// Collect results in-memory
List<EnrichedOrder> results = new ArrayList<>();
enriched.addSink(new SinkFunction<>() {
@Override
public void invoke(EnrichedOrder value, Context ctx) {
results.add(value);
}
});
env.execute("test-job");
// Assertions
assertEquals(3, results.size());
assertEquals("VIP", results.get(0).customerSegment);
}
private static List<Order> testOrders() {
return List.of(
new Order(1L, "user-1", 100.0),
new Order(2L, "user-2", 50.0),
new Order(3L, "user-1", 200.0)
);
}
}
Maven зависимости:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Это интеграционный тест полного пайплайна без Kubernetes, без реального Kafka. Запускается в CI за секунды. Покрывает 80% багов перед production.
Testing с реальными внешними системами через Testcontainers
Для тестов Kafka source, CDC, JDBC sink используйте Testcontainers — это библиотека, которая поднимает реальные Kafka/Postgres/Redis в Docker, изолированно на тест:
@Testcontainers
class KafkaIntegrationTest {
@Container
static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
@Test
void testKafkaSourceToSink() throws Exception {
// Produce test events to Kafka
Producer<String, String> producer = new KafkaProducer<>(...);
producer.send(new ProducerRecord<>("input-topic", "key", "value"));
// Run Flink job (with MiniCluster)
StreamExecutionEnvironment env = ...;
DataStream<Event> events = env.fromSource(
KafkaSource.<Event>builder()
.setBootstrapServers(KAFKA.getBootstrapServers())
.setTopics("input-topic")
.setValueOnlyDeserializer(new EventDeserializer())
.build(),
WatermarkStrategy.noWatermarks(),
"kafka-source"
);
events.sinkTo(KafkaSink.<Event>builder()...build());
env.executeAsync("integration-test");
// Verify in output topic
Consumer<String, String> consumer = new KafkaConsumer<>(...);
consumer.subscribe(List.of("output-topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertFalse(records.isEmpty());
}
}
Testcontainers + MiniCluster = realistic тест без deploy в реальный кластер. CI время: ~30-60 секунд на test class. Покрывает реальные edge-cases connector-ов.
CI workflow: build + test (на PR)
GitHub Actions / GitLab CI / Jenkins — выбор за вами. Принцип один.
# .github/workflows/ci.yaml
name: CI
on:
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Java
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
- name: Cache Maven
uses: actions/cache@v4
with:
path: ~/.m2
key: m2-${{ hashFiles('**/pom.xml') }}
- name: Run unit tests
run: mvn -B test
- name: Run integration tests with MiniCluster
run: mvn -B verify -P integration
- name: Lint UID audit
run: |
# Проверяем, что все stateful операторы имеют UID
MISSING=$(grep -rE "\.process\(|\.window\(|\.fromSource\(|\.sinkTo\(" src/main/java | \
grep -v "\.uid(" || true)
if [ -n "$MISSING" ]; then
echo "Missing UIDs in:" && echo "$MISSING"
exit 1
fi
- name: Check Avro schema compatibility
run: |
# Используем avro-tools для проверки совместимости с baseline schema
mvn avro-compatibility-check
На каждом PR прогоняем: unit tests + integration tests (MiniCluster) + UID audit + Avro compatibility check. Без зелёного CI — нельзя merge.
CD workflow: build + deploy (на git tag)
# .github/workflows/cd.yaml
name: CD
on:
push:
tags: ['v*']
jobs:
deploy-stage:
runs-on: ubuntu-latest
environment: stage
steps:
- uses: actions/checkout@v4
- name: Extract version
id: version
run: echo "VERSION=${GITHUB_REF_NAME#v}" >> $GITHUB_OUTPUT
- name: Build JAR
run: mvn -B -Drevision=${{ steps.version.outputs.VERSION }} package -DskipTests
- name: Build and push Docker
run: |
IMAGE=my-registry/orders-pipeline:${{ steps.version.outputs.VERSION }}
docker build -t $IMAGE .
docker push $IMAGE
- name: Deploy to stage K8s
run: |
kubectl config use-context stage-k8s
# kustomize set image
cd deploy/stage
kustomize edit set image my-registry/orders-pipeline:${{ steps.version.outputs.VERSION }}
kubectl apply -k .
- name: Wait for deploy
run: |
kubectl wait --for=condition=Ready flinkdeployment/orders-pipeline -n flink-stage --timeout=10m
- name: Smoke test
run: |
# Curl REST API джоба, отправить тестовое событие в Kafka, проверить output
./scripts/smoke-test.sh
deploy-prod:
needs: deploy-stage
runs-on: ubuntu-latest
environment: prod # требует manual approval в GitHub
steps:
- uses: actions/checkout@v4
- name: Get current savepoint path (for rollback)
run: |
CURRENT_SP=$(kubectl get flinkdeployment orders-pipeline -n flink-prod \
-o jsonpath='{.status.jobStatus.savepointInfo.lastSavepoint.location}')
# Регистрируем путь в external store (S3 bucket с tags или DynamoDB)
aws s3api put-object-tagging --bucket flink-rollback-registry \
--key prod/orders-pipeline/${{ github.ref_name }} \
--tagging "TagSet=[{Key=savepoint,Value=$CURRENT_SP}]"
- name: Deploy to prod
run: |
kubectl config use-context prod-k8s
cd deploy/prod
kustomize edit set image my-registry/orders-pipeline:${{ github.ref_name }}
kubectl apply -k .
- name: Wait for STABLE
run: |
# Опрашиваем lifecycle state в течение 10 минут
for i in {1..60}; do
STATE=$(kubectl get flinkdeployment orders-pipeline -n flink-prod \
-o jsonpath='{.status.lifecycleState}')
if [ "$STATE" = "STABLE" ]; then
echo "STABLE after $((i*10))s"
exit 0
fi
sleep 10
done
echo "Failed to reach STABLE in 10m"
exit 1
- name: Smoke test prod
run: ./scripts/smoke-test-prod.sh
Ключевые моменты:
- Multi-stage: сначала stage, потом prod с manual approval (GitHub environments).
- Versioning consistent: одна версия для Maven, Docker, kubectl manifest.
- Pre-deploy rollback registration: записываем путь savepoint-а ДО deploy в external store, чтобы rollback-deploy знал, откуда восстанавливаться.
- Smoke test после deploy: тестовое событие через source, проверка output. Catches catastrophic failures.
Автоматический rollback при провале
Что если smoke test провалился? Или job ушёл в ERROR через час? Нужен rollback workflow.
# .github/workflows/rollback.yaml
name: Rollback
on:
workflow_dispatch:
inputs:
target_version:
description: 'Версия для отката (например, v0.1.5)'
required: true
jobs:
rollback:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Get savepoint for target version
run: |
# Читаем savepoint path из rollback registry
TARGET_SP=$(aws s3api get-object-tagging \
--bucket flink-rollback-registry \
--key prod/orders-pipeline/${{ inputs.target_version }} \
--query 'TagSet[?Key==`savepoint`].Value' --output text)
echo "TARGET_SP=$TARGET_SP" >> $GITHUB_ENV
- name: Rollback deploy
run: |
kubectl config use-context prod-k8s
# Patch FlinkDeployment с target image и initialSavepointPath
kubectl patch flinkdeployment orders-pipeline -n flink-prod --type=merge -p "
{
\"spec\": {
\"image\": \"my-registry/orders-pipeline:${{ inputs.target_version }}\",
\"job\": {
\"initialSavepointPath\": \"$TARGET_SP\",
\"upgradeMode\": \"savepoint\"
}
}
}"
- name: Wait for STABLE
run: |
for i in {1..60}; do
STATE=$(kubectl get flinkdeployment orders-pipeline -n flink-prod \
-o jsonpath='{.status.lifecycleState}')
[ "$STATE" = "STABLE" ] && exit 0
sleep 10
done
exit 1
Запуск: gh workflow run rollback.yaml -f target_version=v0.1.5. Через 10 минут джоб вернётся к старой версии с восстановленным состоянием.
Schema migration tests: специальный класс integration-тестов
Самые опасные деплои — те, которые меняют схему state. Тестируем их специально:
@Test
void testRestoreFromPreviousVersionSavepoint() throws Exception {
// 1. Локально (или через artifact из CI) есть savepoint от prev version
Path prevSavepoint = Paths.get("test-resources/savepoints/v0.1.5/orders-pipeline-sp");
// 2. Запускаем новую версию джоба с restore from this savepoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().enableExternalizedCheckpoints(...);
// ...
// 3. Используем SavepointReader из State Processor API
SavepointReader savepoint = SavepointReader.read(
env, prevSavepoint.toString()
);
// 4. Проверяем, что state корректно загружается
DataSet<UserState> userStates = savepoint
.readKeyedState("enrichment-uid", new UserStateReader());
long count = userStates.count();
assertTrue(count > 0, "State should be non-empty after restore");
}
В CI: храните savepoint от последней prod-версии в git artifact (или artifacts bucket). Каждый PR прогоняет restore-test против него.
Ключевые выводы
-
Версионирование: один git tag = одна Maven version = один Docker image tag. Используйте semantic versioning.
-
Integration tests с MiniCluster покрывают 80% багов перед production. Запускаются за секунды в CI.
-
Testcontainers для connectors: Kafka, Postgres в Docker для realistic integration tests.
-
CI workflow на PR: unit + integration tests + UID audit + Avro compatibility check.
-
CD workflow на git tag: build -> push image -> deploy to stage -> manual approval -> deploy to prod -> smoke test.
-
Pre-deploy savepoint registration: записываем путь savepoint текущей prod-версии в external store ДО deploy. Это даёт чёткую точку отката.
-
Rollback workflow: GitHub action с target_version, читает savepoint из registry, патчит FlinkDeployment с предыдущим image + initialSavepointPath.
-
Schema migration tests: специальный класс тестов с restore из real savepoint предыдущей версии. Защищает от breaking changes стейта.