Оркестрация Spark через Airflow
Почему Airflow для Spark?
CI/CD pipeline собирает и деплоит Spark-приложение. Но production ETL — это не один job, а цепочка зависимых задач:
- Дождаться появления данных в S3
- Запустить Spark transform
- Проверить качество данных
- Загрузить результат в data warehouse
- Отправить уведомление
Apache Airflow — стандартный orchestrator для data pipelines. Он управляет зависимостями, расписанием, retry-логикой и мониторингом.
Scope этого урока: мы рассматриваем Airflow исключительно как orchestrator для Spark jobs. Полное изучение Airflow (DAG patterns, XCom, Pools, Custom Operators) — тема отдельного курса.
SparkSubmitOperator
Базовый способ запуска Spark из Airflow:
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-engineering",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["[email protected]"],
}
with DAG(
dag_id="daily_etl",
default_args=default_args,
schedule_interval="0 2 * * *", # каждый день в 02:00 UTC
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["spark", "etl"],
) as dag:
transform = SparkSubmitOperator(
task_id="spark_transform",
application="/opt/spark/apps/transform.py",
conn_id="spark_default", # Airflow Connection
conf={
"spark.executor.memory": "8g",
"spark.executor.cores": "4",
"spark.dynamicAllocation.enabled": "true",
"spark.sql.adaptive.enabled": "true",
},
py_files="/opt/spark/deps/spark_etl-2.1.0.whl",
application_args=[
"--date", "{{ ds }}",
"--env", "production",
],
name="daily-transform-{{ ds }}",
verbose=False,
)
Airflow Connection для Spark:
Connection ID: spark_default
Connection Type: Spark
Host: k8s://https://k8s-api:6443
Extra: {
"deploy-mode": "cluster",
"spark-binary": "spark-submit",
"namespace": "spark-jobs"
}
Platform-Specific Operators
Databricks
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
DatabricksSubmitRunOperator,
)
# Запуск существующего Databricks Job
run_existing = DatabricksRunNowOperator(
task_id="run_databricks_job",
databricks_conn_id="databricks_default",
job_id=12345,
notebook_params={
"date": "{{ ds }}",
"env": "production",
},
)
# Создание и запуск нового Run
submit_new = DatabricksSubmitRunOperator(
task_id="submit_new_run",
databricks_conn_id="databricks_default",
json={
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 5,
},
"spark_python_task": {
"python_file": "dbfs:/apps/etl/transform.py",
"parameters": ["--date", "{{ ds }}"],
},
},
)
AWS EMR
from airflow.providers.amazon.aws.operators.emr import (
EmrAddStepsOperator,
EmrCreateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
# Добавить step к существующему EMR-кластеру
add_step = EmrAddStepsOperator(
task_id="add_spark_step",
job_flow_id="{{ var.value.emr_cluster_id }}",
steps=[{
"Name": "daily-transform",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
"--py-files", "s3://artifacts/spark_etl-2.1.0.whl",
"s3://artifacts/main.py",
"--date", "{{ ds }}",
],
},
}],
)
# Ожидание завершения step
wait_step = EmrStepSensor(
task_id="wait_for_step",
job_flow_id="{{ var.value.emr_cluster_id }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_spark_step')[0] }}",
target_states=["COMPLETED"],
failed_states=["CANCELLED", "FAILED"],
)
add_step >> wait_step
Kubernetes
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
spark_on_k8s = KubernetesPodOperator(
task_id="spark_k8s_job",
namespace="spark-jobs",
image="company/spark:4.0-etl",
cmds=["spark-submit"],
arguments=[
"--master", "k8s://https://kubernetes.default.svc:443",
"--deploy-mode", "cluster",
"--conf", "spark.executor.instances=5",
"--conf", "spark.executor.memory=8g",
"/opt/spark/apps/transform.py",
],
service_account_name="spark-submit",
is_delete_operator_pod=True,
get_logs=True,
)
DAG Pattern: Production ETL Pipeline
Типичный production DAG для Spark ETL:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
with DAG(
dag_id="production_etl_pipeline",
schedule_interval="0 3 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=10),
"execution_timeout": timedelta(hours=2),
},
) as dag:
# 1. Sensor: ждём появления данных
wait_for_data = S3KeySensor(
task_id="wait_for_data",
bucket_name="raw-data",
bucket_key="events/{{ ds }}/",
timeout=3600, # max 1 час ожидания
poke_interval=60,
)
# 2. Transform: Spark ETL
spark_transform = SparkSubmitOperator(
task_id="spark_transform",
application="/opt/spark/apps/transform.py",
conn_id="spark_default",
application_args=["--date", "{{ ds }}"],
)
# 3. Validate: проверка качества данных
validate_output = SparkSubmitOperator(
task_id="validate_output",
application="/opt/spark/apps/validate.py",
conn_id="spark_default",
application_args=["--date", "{{ ds }}"],
)
# 4. Notify: уведомление о завершении
notify_success = EmailOperator(
task_id="notify_success",
to="[email protected]",
subject="ETL Complete: {{ ds }}",
html_content="Pipeline completed successfully for {{ ds }}.",
)
# Dependencies
wait_for_data >> spark_transform >> validate_output >> notify_success
DAG Visualization:
[S3 Sensor] ──→ [Spark Transform] ──→ [Validate] ──→ [Email]
wait data ETL pipeline data quality notify
(poke 60s) (2hr timeout) (assertions) success
Мониторинг и SLA
# SLA -- гарантия времени выполнения
with DAG(
dag_id="sla_monitored_etl",
sla_miss_callback=sla_alert_function,
default_args={
"sla": timedelta(hours=4), # весь DAG должен завершиться за 4 часа
},
) as dag:
pass
# Task-level retry с exponential backoff
transform = SparkSubmitOperator(
task_id="transform",
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
# ...
)
| Метрика | Что отслеживать | Действие при нарушении |
|---|---|---|
| SLA miss | DAG не завершился в срок | Alert в Slack/PagerDuty |
| Task failure | Spark job упал | Auto-retry (3x), затем alert |
| Duration anomaly | Job 2x дольше обычного | Alert для расследования |
| Data freshness | Output-данные устарели | Re-trigger pipeline |
Что дальше?
Airflow запускает Spark jobs. Но что если ваши трансформации — SQL-модели с зависимостями? В следующем уроке — dbt + Spark для SQL-first data pipeline.