Learning Platform
Глоссарий Troubleshooting
Урок 10.06 · 14 мин
Продвинутый
AirflowSparkSubmitOperatorDAGOrchestrationETL PipelineScheduling

Оркестрация Spark через Airflow

Почему Airflow для Spark?

CI/CD pipeline собирает и деплоит Spark-приложение. Но production ETL — это не один job, а цепочка зависимых задач:

  1. Дождаться появления данных в S3
  2. Запустить Spark transform
  3. Проверить качество данных
  4. Загрузить результат в data warehouse
  5. Отправить уведомление

Apache Airflow — стандартный orchestrator для data pipelines. Он управляет зависимостями, расписанием, retry-логикой и мониторингом.

TIP

Scope этого урока: мы рассматриваем Airflow исключительно как orchestrator для Spark jobs. Полное изучение Airflow (DAG patterns, XCom, Pools, Custom Operators) — тема отдельного курса.

SparkSubmitOperator

Базовый способ запуска Spark из Airflow:

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-engineering",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["[email protected]"],
}

with DAG(
    dag_id="daily_etl",
    default_args=default_args,
    schedule_interval="0 2 * * *",  # каждый день в 02:00 UTC
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["spark", "etl"],
) as dag:

    transform = SparkSubmitOperator(
        task_id="spark_transform",
        application="/opt/spark/apps/transform.py",
        conn_id="spark_default",  # Airflow Connection
        conf={
            "spark.executor.memory": "8g",
            "spark.executor.cores": "4",
            "spark.dynamicAllocation.enabled": "true",
            "spark.sql.adaptive.enabled": "true",
        },
        py_files="/opt/spark/deps/spark_etl-2.1.0.whl",
        application_args=[
            "--date", "{{ ds }}",
            "--env", "production",
        ],
        name="daily-transform-{{ ds }}",
        verbose=False,
    )

Airflow Connection для Spark:

Connection ID: spark_default
Connection Type: Spark
Host: k8s://https://k8s-api:6443
Extra: {
  "deploy-mode": "cluster",
  "spark-binary": "spark-submit",
  "namespace": "spark-jobs"
}

Platform-Specific Operators

Databricks

from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator,
    DatabricksSubmitRunOperator,
)

# Запуск существующего Databricks Job
run_existing = DatabricksRunNowOperator(
    task_id="run_databricks_job",
    databricks_conn_id="databricks_default",
    job_id=12345,
    notebook_params={
        "date": "{{ ds }}",
        "env": "production",
    },
)

# Создание и запуск нового Run
submit_new = DatabricksSubmitRunOperator(
    task_id="submit_new_run",
    databricks_conn_id="databricks_default",
    json={
        "new_cluster": {
            "spark_version": "14.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 5,
        },
        "spark_python_task": {
            "python_file": "dbfs:/apps/etl/transform.py",
            "parameters": ["--date", "{{ ds }}"],
        },
    },
)

AWS EMR

from airflow.providers.amazon.aws.operators.emr import (
    EmrAddStepsOperator,
    EmrCreateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor

# Добавить step к существующему EMR-кластеру
add_step = EmrAddStepsOperator(
    task_id="add_spark_step",
    job_flow_id="{{ var.value.emr_cluster_id }}",
    steps=[{
        "Name": "daily-transform",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode", "cluster",
                "--py-files", "s3://artifacts/spark_etl-2.1.0.whl",
                "s3://artifacts/main.py",
                "--date", "{{ ds }}",
            ],
        },
    }],
)

# Ожидание завершения step
wait_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id="{{ var.value.emr_cluster_id }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_spark_step')[0] }}",
    target_states=["COMPLETED"],
    failed_states=["CANCELLED", "FAILED"],
)

add_step >> wait_step

Kubernetes

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

spark_on_k8s = KubernetesPodOperator(
    task_id="spark_k8s_job",
    namespace="spark-jobs",
    image="company/spark:4.0-etl",
    cmds=["spark-submit"],
    arguments=[
        "--master", "k8s://https://kubernetes.default.svc:443",
        "--deploy-mode", "cluster",
        "--conf", "spark.executor.instances=5",
        "--conf", "spark.executor.memory=8g",
        "/opt/spark/apps/transform.py",
    ],
    service_account_name="spark-submit",
    is_delete_operator_pod=True,
    get_logs=True,
)

DAG Pattern: Production ETL Pipeline

Типичный production DAG для Spark ETL:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

with DAG(
    dag_id="production_etl_pipeline",
    schedule_interval="0 3 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=10),
        "execution_timeout": timedelta(hours=2),
    },
) as dag:

    # 1. Sensor: ждём появления данных
    wait_for_data = S3KeySensor(
        task_id="wait_for_data",
        bucket_name="raw-data",
        bucket_key="events/{{ ds }}/",
        timeout=3600,  # max 1 час ожидания
        poke_interval=60,
    )

    # 2. Transform: Spark ETL
    spark_transform = SparkSubmitOperator(
        task_id="spark_transform",
        application="/opt/spark/apps/transform.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
    )

    # 3. Validate: проверка качества данных
    validate_output = SparkSubmitOperator(
        task_id="validate_output",
        application="/opt/spark/apps/validate.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
    )

    # 4. Notify: уведомление о завершении
    notify_success = EmailOperator(
        task_id="notify_success",
        to="[email protected]",
        subject="ETL Complete: {{ ds }}",
        html_content="Pipeline completed successfully for {{ ds }}.",
    )

    # Dependencies
    wait_for_data >> spark_transform >> validate_output >> notify_success
DAG Visualization:
  [S3 Sensor]  ──→  [Spark Transform]  ──→  [Validate]  ──→  [Email]
   wait data         ETL pipeline           data quality       notify
   (poke 60s)        (2hr timeout)          (assertions)       success

Мониторинг и SLA

# SLA -- гарантия времени выполнения
with DAG(
    dag_id="sla_monitored_etl",
    sla_miss_callback=sla_alert_function,
    default_args={
        "sla": timedelta(hours=4),  # весь DAG должен завершиться за 4 часа
    },
) as dag:
    pass

# Task-level retry с exponential backoff
transform = SparkSubmitOperator(
    task_id="transform",
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30),
    # ...
)
МетрикаЧто отслеживатьДействие при нарушении
SLA missDAG не завершился в срокAlert в Slack/PagerDuty
Task failureSpark job упалAuto-retry (3x), затем alert
Duration anomalyJob 2x дольше обычногоAlert для расследования
Data freshnessOutput-данные устарелиRe-trigger pipeline
Проверка знанийKnowledge check
Какой Airflow operator выбрать для запуска Spark job на: (a) self-managed K8s, (b) Databricks, (c) AWS EMR?
ОтветAnswer
(a) Self-managed Kubernetes: KubernetesPodOperator -- запускает pod с spark-submit внутри, или SparkSubmitOperator с conn_id, указывающим на k8s:// master URL. (b) Databricks: DatabricksRunNowOperator для существующих jobs (по job_id) или DatabricksSubmitRunOperator для создания нового run с inline-конфигурацией кластера. (c) AWS EMR: EmrAddStepsOperator для добавления Spark step к существующему кластеру + EmrStepSensor для ожидания завершения. Каждый оператор требует соответствующего Airflow Connection.
Проверка знанийKnowledge check
Опишите типичный production DAG для Spark ETL. Какие 4 стадии он включает?
ОтветAnswer
Типичный production DAG включает: (1) Sensor -- ожидание появления входных данных (S3KeySensor/GCSObjectExistenceSensor), с timeout и poke_interval; (2) Transform -- SparkSubmitOperator с production-конфигурацией (executor memory/cores, dynamic allocation), execution_timeout для защиты от зависших jobs; (3) Validate -- проверка качества выходных данных (assertions на count, null ratio, schema); (4) Notify -- уведомление о результате (EmailOperator, SlackOperator). Цепочка: sensor >> transform >> validate >> notify. Каждый task имеет retries с exponential backoff.

Что дальше?

Airflow запускает Spark jobs. Но что если ваши трансформации — SQL-модели с зависимостями? В следующем уроке — dbt + Spark для SQL-first data pipeline.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Зачем production ETL pipeline нужен orchestrator (Airflow), если уже есть CI/CD?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8