Learning Platform
Глоссарий Troubleshooting
Урок 15.06 · 24 мин
Продвинутый
Remote LoggingS3GCSElasticsearchCloudWatchLog Handlers

Remote logging: S3, GCS, Elasticsearch, Azure Blob, CloudWatch

В production Airflow logs должны жить в external object storage / centralized system. Local disk не работает — worker pods ephemeral, scheduler не имеет доступа к worker file systems, UI не может показать logs из мёртвого worker.

Этот урок — настройка всех supported backends (S3, GCS, Azure Blob, CloudWatch, Elasticsearch), custom handlers, UI integration, и критичный antipattern — DB log handler, который рушит production.


Архитектура logging в Airflow

Где живут task logs
Worker processTask запускается на worker. Стандартный stdout/stderr task callable перехватывается LoggingMixin. Каждая TI имеет свой dedicated file handler.
LoggingMixin (Python logging)
Local file (always)Сначала всё пишется локально в `base_log_folder/dag_id/task_id/run_id/try_N.log`. Это buffer + надёжность — если remote upload фейлится, лог не потерян.
на TI complete
Remote upload (через task_log_reader)Когда TI завершается, custom log handler uploads file в S3/GCS/etc. На subsequent retries — старые files остаются на remote (для history).
UI readWebserver реквест 'show logs for TI X try Y'. Сначала пытается читать с remote (если remote_logging=True), иначе с local file (через RPC к worker via 8793). Streams в browser.

Ключевое для понимания:

  1. Local file ВСЕГДА записывается сначала. Remote — async upload в конце.
  2. UI читает remote только если remote_logging=True. Иначе RPC к worker.
  3. Один файл на try_number — retries создают try_1.log, try_2.log, etc.

Базовая конфигурация remote logging

[logging]
# Включить remote upload
remote_logging = True

# Backend: s3://, gs://, wasb-...://, http+es://, cloudwatch://
remote_base_log_folder = s3://my-airflow-logs/

# Connection ID для аутентификации
remote_log_conn_id = aws_default

# Шифрование (опционально)
encrypt_s3_logs = False

# Local fallback path (используется если remote unavailable)
base_log_folder = /opt/airflow/logs

После рестарта Airflow — все TI начинают upload в remote после complete.


Backend 1: S3

[logging]
remote_logging = True
remote_base_log_folder = s3://airflow-prod-logs/
remote_log_conn_id = aws_default
encrypt_s3_logs = True
delete_local_logs_after_upload = True  # 2.7+

Connection aws_default (через UI / airflow connections add):

airflow connections add aws_default \
  --conn-type aws \
  --conn-extra '{"role_arn": "arn:aws:iam::123456:role/airflow-logs"}'

Или env var:

AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws", "extra": {"region_name": "us-east-1"}}'

IRSA для EKS

В EKS используйте IAM Roles for Service Accounts:

# Worker pod service account
metadata:
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::123456:role/airflow-logs

И в connection — пустое (использует instance profile / IRSA):

AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws"}'

S3 bucket setup

# Terraform
resource "aws_s3_bucket" "airflow_logs" {
  bucket = "airflow-prod-logs"
}

resource "aws_s3_bucket_lifecycle_configuration" "logs" {
  bucket = aws_s3_bucket.airflow_logs.id

  rule {
    id     = "expire-old-logs"
    status = "Enabled"

    expiration {
      days = 90
    }
    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "logs" {
  bucket = aws_s3_bucket.airflow_logs.id
  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

90 days retention + Standard-IA после 30 days = cost optimization для archive logs.


Backend 2: GCS

[logging]
remote_logging = True
remote_base_log_folder = gs://airflow-prod-logs/
remote_log_conn_id = google_cloud_default

Connection:

airflow connections add google_cloud_default \
  --conn-type google_cloud_platform \
  --conn-extra '{"key_path": "/var/secrets/sa.json"}'

В GKE — Workload Identity:

# Pod service account binds to GCP SA
metadata:
  annotations:
    iam.gke.io/gcp-service-account: [email protected]

Permissions:

gcloud projects add-iam-policy-binding my-project \
  --member="serviceAccount:[email protected]" \
  --role="roles/storage.objectAdmin"

Backend 3: Azure Blob

[logging]
remote_logging = True
remote_base_log_folder = [email protected]/
remote_log_conn_id = azure_blob_default

Connection через provider apache-airflow-providers-microsoft-azure. Auth через connection string или Managed Identity.


Backend 4: Elasticsearch

ES — особый case: логи не файлы, а structured docs. Не upload, а streaming.

[elasticsearch]
host = elasticsearch.example.com:9200
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
end_of_log_mark = end_of_log
frontend = https://kibana.example.com/app/discover
write_stdout = True
json_format = True
json_fields = asctime,filename,lineno,levelname,message

[logging]
remote_logging = True
remote_log_conn_id = elastic_default
remote_base_log_folder = http+es://elasticsearch:9200

Setup: workers пишут JSON в stdout, sidecar fluent-bit / filebeat подбирает и отправляет в ES.

# Pod spec
spec:
  containers:
  - name: airflow-worker
    # ... airflow ...
  - name: filebeat
    image: docker.elastic.co/beats/filebeat:8.13.0
    volumeMounts:
      - name: var-log
        mountPath: /var/log/airflow
    config: |
      filebeat.inputs:
        - type: log
          paths: ['/var/log/airflow/*.log']
          json.keys_under_root: true
      output.elasticsearch:
        hosts: ['elasticsearch:9200']
        index: 'airflow-logs-%{[+yyyy.MM.dd]}'

Преимущества ES

  • Search across logsdag_id:etl_orders AND level:ERROR за неделю.
  • Aggregation — top 10 dag_ids by error count.
  • Kibana UI для логов (a-la Splunk).

Недостатки

  • Дороже S3 в storage (ES — RAM-heavy).
  • Сложнее maintain (cluster, indexing, shards).
  • Index bloat — без retention растёт unbounded.

Для большинства случаев S3 + standalone Kibana/Loki/Splunk — лучший trade-off.


Backend 5: CloudWatch Logs

[logging]
remote_logging = True
remote_base_log_folder = cloudwatch://arn:aws:logs:us-east-1:123456:log-group:airflow-prod
remote_log_conn_id = aws_default

Logs идут в CloudWatch log group. Каждый task создаёт log stream.

Pros: AWS-native, integrated с CloudWatch Insights, no separate infrastructure. Cons: дорого для high-volume ($0.50/GB ingest), query limits.

Используется в MWAA (managed) by default.


Backend 6: Custom log handler

Если ни один built-in не подходит — custom handler:

# my_company/log_handlers.py
import logging
from airflow.utils.log.file_task_handler import FileTaskHandler


class MyCustomTaskHandler(FileTaskHandler):
    """
    Custom handler — пишет local + uploads в internal log system.
    """

    def __init__(self, base_log_folder: str, internal_endpoint: str):
        super().__init__(base_log_folder)
        self.internal_endpoint = internal_endpoint

    def _read(self, ti, try_number, metadata=None):
        # Read logs из internal system для UI
        ...

    def close(self):
        super().close()
        # После TI complete — upload local file в internal system
        ...

Регистрация через airflow_local_settings.py:

# airflow_local_settings.py
LOGGING_CONFIG = {
    "handlers": {
        "task": {
            "class": "my_company.log_handlers.MyCustomTaskHandler",
            "internal_endpoint": "https://logs.internal.example.com",
            "base_log_folder": "/opt/airflow/logs",
        }
    }
    # ... остальное копируется из default config
}

DB log handler — НИКОГДА в production

В Airflow есть legacy DBTaskHandler, который пишет логи в metadata DB. Это catastrophic antipattern.

# DO NOT USE
[core]
task_log_handler_config = airflow.utils.log.db_task_handler.DBTaskHandler

Почему это плохо

ПроблемаСимптом
Metadata DB размер взрывается100s GB DB → slow scheduler, slow UI
log table 70% объёмаVACUUM застревает, replication lag
Lock contentionScheduler critical section ждёт on log INSERT
Backup overheadDB dump час
Невозможно retention без downtimeDELETE на 100M rows — minutes lock

Production Airflow installations с DB log handler всегда ломаются на масштабе.

Migration off DB handler:

  1. Switch to S3/GCS/CW в config.
  2. Restart всё.
  3. Cleanup log table:
-- Backup сначала
COPY log TO '/tmp/log_backup.csv';

-- Truncate (быстрее DELETE)
TRUNCATE TABLE log;

-- VACUUM
VACUUM FULL log;

После migration metadata DB резко shrink, scheduler ускоряется.


UI integration: how UI reads logs

Webserver, когда показывает logs:

1. UI request: GET /log?dag_id=X&task_id=Y&run_id=Z&try_number=1

2. Webserver вызывает task_log_reader:
   a. If remote_logging=True → читает с remote (S3 GET, GCS download, ES query)
   b. Else → RPC к worker через 8793 port (если живой)
   c. Else → файл с local disk (если webserver shares fs с workers)

3. Returns log content as HTML/text

Production setup: webserver IAM permissions

Если remote=S3, webserver pod должен иметь read access к bucket:

# Webserver IRSA role
{
  "Effect": "Allow",
  "Action": ["s3:GetObject", "s3:ListBucket"],
  "Resource": [
    "arn:aws:s3:::airflow-prod-logs",
    "arn:aws:s3:::airflow-prod-logs/*"
  ]
}

Worker pods — write access (PutObject). Webserver — read only.

Latency UI logs

S3 GET ~100-500ms (cold). При больших logs (100MB) — потенциально 5-10s. UI таймаут default 30s.

Mitigation:

  • Включить S3 Transfer Acceleration для cross-region.
  • Кешировать в CloudFront / CDN перед webserver.
  • Splitting long logs на chunks с pagination в UI (2.10+).

UI часто хочет глубинный link на «full logs in Kibana / Splunk / CloudWatch». Через [elasticsearch] frontend (для ES) или custom:

[elasticsearch]
frontend = https://kibana.example.com/app/discover#/?_g=()&_a=(query:(language:kuery,query:'log_id:"{log_id}"'))

В UI Airflow появится “View logs in Kibana” button с pre-filled query.


Log retention strategies

S3 lifecycle policy

lifecycle_rule {
  id     = "logs-archive"
  status = "Enabled"

  transition {
    days          = 30
    storage_class = "STANDARD_IA"  # cheaper after 30d
  }
  transition {
    days          = 90
    storage_class = "GLACIER"  # archive after 90d
  }
  expiration {
    days = 365  # delete after 1 year
  }
}

GCS lifecycle

gsutil lifecycle set lifecycle.json gs://my-bucket
{
  "lifecycle": {
    "rule": [
      {"action": {"type": "SetStorageClass", "storageClass": "NEARLINE"}, "condition": {"age": 30}},
      {"action": {"type": "SetStorageClass", "storageClass": "ARCHIVE"}, "condition": {"age": 365}},
      {"action": {"type": "Delete"}, "condition": {"age": 1825}}
    ]
  }
}

ES ILM

{
  "policy": {
    "phases": {
      "hot": { "actions": { "rollover": { "max_age": "7d" } } },
      "warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 } } },
      "delete": { "min_age": "90d", "actions": { "delete": {} } }
    }
  }
}

Production gotchas

1. delete_local_logs_after_upload = False default

Local files остаются на worker disk forever без cleanup. Worker fills up за дни.

Fix: delete_local_logs_after_upload = True (2.7+). Или daily cron find /opt/airflow/logs -mtime +1 -delete.

2. Remote upload async — race condition с UI read

Когда TI complete, scheduler возвращает success до upload finished. UI пытается читать с S3 → 404. Через ~5-30s upload завершается.

Симптом: «логов нет» сразу после задачи, через минуту появляются.

Mitigation: UI fallback на worker RPC. Или upload в hot path до return success (slow).

3. Worker died — последний batch logs lost

Local files на worker не успели upload. Worker pod killed → local fs gone.

Fix:

  • Persistent volume для local logs (но это против ephemeral nature).
  • sidecar который continuously upload (real-time tail).
  • Использовать ES streaming (logs идут в realtime).

4. try_N.log naming — confusing

Каждый retry создаёт новый file. UI default показывает latest try. Для investigation предыдущей попытки — manual select try_number.

5. Cross-region S3 — high latency

UI в Europe, bucket в us-east-1 → каждый log read 200-500ms.

Fix: bucket в той же region что webserver. Multi-region requires replication (cost).

6. ES JSON parsing breakage

Если task pусит non-JSON в stdout (например, через print() без json.dumps) — log entries become invalid в ES. Index mapping breaks, alerts «cannot parse».

Fix: strict logging — ВСЕГДА через logger, никаких print() в production code. ES mapping ignore_malformed: true как safety net.


Проверка знанийKnowledge check
Production Airflow cluster: 10k DAGs, 200k TI/day. Команда жалуется: 'metadata DB занимает 800GB, scheduler tick медленный (p95 = 60s), backup занимает 4 часа'. Investigation show table `log` = 600GB. Что произошло, как починить?
ОтветAnswer
**Root cause:** DB task log handler включён (legacy `airflow.utils.log.db_task_handler.DBTaskHandler`) — каждый log line task — INSERT в `log` table. На 200k TI/day с ~100 lines/task = 20M INSERTs/day = unbounded growth. **Симптомы:** (1) `log` table dominate DB size; (2) Critical section scheduler ждёт on lock contention с INSERT log; (3) VACUUM застревает на huge log table; (4) Backup row-by-row dump slow. **Migration off DB log handler (production safe):** **Phase 1 (config switch):** В `airflow.cfg`: `[logging] remote_logging = True; remote_base_log_folder = s3://airflow-prod-logs/; remote_log_conn_id = aws_default`. Удалить custom `task_log_handler_config` если был. Restart всех components (rolling). С этого момента **new logs** идут в S3. **Phase 2 (cleanup old):** Не DELETE (lock table на часы) — а через partitioned approach: ```sql -- Сначала rename CREATE TABLE log_archived AS SELECT * FROM log WHERE dttm < '2025-01-01'; -- Delete batches ALTER TABLE log SET (autovacuum_enabled = false); -- prevent contention DELETE FROM log WHERE dttm < '2025-05-01' AND id IN (SELECT id FROM log WHERE dttm < '2025-05-01' LIMIT 100000); -- Repeat batches, VACUUM каждые 10 batches ``` Альтернатива — `TRUNCATE log` (фастер) если backups confirmed. **Phase 3 (long term):** (1) `delete_local_logs_after_upload = True`. (2) S3 lifecycle policy для old logs (Standard-IA 30d, Glacier 90d, expire 365d). (3) Monitoring `log` table size — alert на growth. (4) `airflow db clean` daily для residual log entries. **Expected outcome:** DB shrink с 800GB до ~50-80GB, scheduler tick падает до <5s, backup до 20-30 минут. Это самый частый production issue Airflow.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Почему DB log handler — catastrophic antipattern для production?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7