Remote logging: S3, GCS, Elasticsearch, Azure Blob, CloudWatch
В production Airflow logs должны жить в external object storage / centralized system. Local disk не работает — worker pods ephemeral, scheduler не имеет доступа к worker file systems, UI не может показать logs из мёртвого worker.
Этот урок — настройка всех supported backends (S3, GCS, Azure Blob, CloudWatch, Elasticsearch), custom handlers, UI integration, и критичный antipattern — DB log handler, который рушит production.
Архитектура logging в Airflow
Ключевое для понимания:
- Local file ВСЕГДА записывается сначала. Remote — async upload в конце.
- UI читает remote только если
remote_logging=True. Иначе RPC к worker. - Один файл на try_number — retries создают
try_1.log,try_2.log, etc.
Базовая конфигурация remote logging
[logging]
# Включить remote upload
remote_logging = True
# Backend: s3://, gs://, wasb-...://, http+es://, cloudwatch://
remote_base_log_folder = s3://my-airflow-logs/
# Connection ID для аутентификации
remote_log_conn_id = aws_default
# Шифрование (опционально)
encrypt_s3_logs = False
# Local fallback path (используется если remote unavailable)
base_log_folder = /opt/airflow/logs
После рестарта Airflow — все TI начинают upload в remote после complete.
Backend 1: S3
[logging]
remote_logging = True
remote_base_log_folder = s3://airflow-prod-logs/
remote_log_conn_id = aws_default
encrypt_s3_logs = True
delete_local_logs_after_upload = True # 2.7+
Connection aws_default (через UI / airflow connections add):
airflow connections add aws_default \
--conn-type aws \
--conn-extra '{"role_arn": "arn:aws:iam::123456:role/airflow-logs"}'
Или env var:
AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws", "extra": {"region_name": "us-east-1"}}'
IRSA для EKS
В EKS используйте IAM Roles for Service Accounts:
# Worker pod service account
metadata:
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456:role/airflow-logs
И в connection — пустое (использует instance profile / IRSA):
AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws"}'
S3 bucket setup
# Terraform
resource "aws_s3_bucket" "airflow_logs" {
bucket = "airflow-prod-logs"
}
resource "aws_s3_bucket_lifecycle_configuration" "logs" {
bucket = aws_s3_bucket.airflow_logs.id
rule {
id = "expire-old-logs"
status = "Enabled"
expiration {
days = 90
}
transition {
days = 30
storage_class = "STANDARD_IA"
}
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "logs" {
bucket = aws_s3_bucket.airflow_logs.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
90 days retention + Standard-IA после 30 days = cost optimization для archive logs.
Backend 2: GCS
[logging]
remote_logging = True
remote_base_log_folder = gs://airflow-prod-logs/
remote_log_conn_id = google_cloud_default
Connection:
airflow connections add google_cloud_default \
--conn-type google_cloud_platform \
--conn-extra '{"key_path": "/var/secrets/sa.json"}'
В GKE — Workload Identity:
# Pod service account binds to GCP SA
metadata:
annotations:
iam.gke.io/gcp-service-account: [email protected]
Permissions:
gcloud projects add-iam-policy-binding my-project \
--member="serviceAccount:[email protected]" \
--role="roles/storage.objectAdmin"
Backend 3: Azure Blob
[logging]
remote_logging = True
remote_base_log_folder = [email protected]/
remote_log_conn_id = azure_blob_default
Connection через provider apache-airflow-providers-microsoft-azure. Auth через connection string или Managed Identity.
Backend 4: Elasticsearch
ES — особый case: логи не файлы, а structured docs. Не upload, а streaming.
[elasticsearch]
host = elasticsearch.example.com:9200
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
end_of_log_mark = end_of_log
frontend = https://kibana.example.com/app/discover
write_stdout = True
json_format = True
json_fields = asctime,filename,lineno,levelname,message
[logging]
remote_logging = True
remote_log_conn_id = elastic_default
remote_base_log_folder = http+es://elasticsearch:9200
Setup: workers пишут JSON в stdout, sidecar fluent-bit / filebeat подбирает и отправляет в ES.
# Pod spec
spec:
containers:
- name: airflow-worker
# ... airflow ...
- name: filebeat
image: docker.elastic.co/beats/filebeat:8.13.0
volumeMounts:
- name: var-log
mountPath: /var/log/airflow
config: |
filebeat.inputs:
- type: log
paths: ['/var/log/airflow/*.log']
json.keys_under_root: true
output.elasticsearch:
hosts: ['elasticsearch:9200']
index: 'airflow-logs-%{[+yyyy.MM.dd]}'
Преимущества ES
- Search across logs —
dag_id:etl_orders AND level:ERRORза неделю. - Aggregation — top 10 dag_ids by error count.
- Kibana UI для логов (a-la Splunk).
Недостатки
- Дороже S3 в storage (ES — RAM-heavy).
- Сложнее maintain (cluster, indexing, shards).
- Index bloat — без retention растёт unbounded.
Для большинства случаев S3 + standalone Kibana/Loki/Splunk — лучший trade-off.
Backend 5: CloudWatch Logs
[logging]
remote_logging = True
remote_base_log_folder = cloudwatch://arn:aws:logs:us-east-1:123456:log-group:airflow-prod
remote_log_conn_id = aws_default
Logs идут в CloudWatch log group. Каждый task создаёт log stream.
Pros: AWS-native, integrated с CloudWatch Insights, no separate infrastructure. Cons: дорого для high-volume ($0.50/GB ingest), query limits.
Используется в MWAA (managed) by default.
Backend 6: Custom log handler
Если ни один built-in не подходит — custom handler:
# my_company/log_handlers.py
import logging
from airflow.utils.log.file_task_handler import FileTaskHandler
class MyCustomTaskHandler(FileTaskHandler):
"""
Custom handler — пишет local + uploads в internal log system.
"""
def __init__(self, base_log_folder: str, internal_endpoint: str):
super().__init__(base_log_folder)
self.internal_endpoint = internal_endpoint
def _read(self, ti, try_number, metadata=None):
# Read logs из internal system для UI
...
def close(self):
super().close()
# После TI complete — upload local file в internal system
...
Регистрация через airflow_local_settings.py:
# airflow_local_settings.py
LOGGING_CONFIG = {
"handlers": {
"task": {
"class": "my_company.log_handlers.MyCustomTaskHandler",
"internal_endpoint": "https://logs.internal.example.com",
"base_log_folder": "/opt/airflow/logs",
}
}
# ... остальное копируется из default config
}
DB log handler — НИКОГДА в production
В Airflow есть legacy DBTaskHandler, который пишет логи в metadata DB. Это catastrophic antipattern.
# DO NOT USE
[core]
task_log_handler_config = airflow.utils.log.db_task_handler.DBTaskHandler
Почему это плохо
| Проблема | Симптом |
|---|---|
| Metadata DB размер взрывается | 100s GB DB → slow scheduler, slow UI |
log table 70% объёма | VACUUM застревает, replication lag |
| Lock contention | Scheduler critical section ждёт on log INSERT |
| Backup overhead | DB dump час |
| Невозможно retention без downtime | DELETE на 100M rows — minutes lock |
Production Airflow installations с DB log handler всегда ломаются на масштабе.
Migration off DB handler:
- Switch to S3/GCS/CW в config.
- Restart всё.
- Cleanup
logtable:
-- Backup сначала
COPY log TO '/tmp/log_backup.csv';
-- Truncate (быстрее DELETE)
TRUNCATE TABLE log;
-- VACUUM
VACUUM FULL log;
После migration metadata DB резко shrink, scheduler ускоряется.
UI integration: how UI reads logs
Webserver, когда показывает logs:
1. UI request: GET /log?dag_id=X&task_id=Y&run_id=Z&try_number=1
2. Webserver вызывает task_log_reader:
a. If remote_logging=True → читает с remote (S3 GET, GCS download, ES query)
b. Else → RPC к worker через 8793 port (если живой)
c. Else → файл с local disk (если webserver shares fs с workers)
3. Returns log content as HTML/text
Production setup: webserver IAM permissions
Если remote=S3, webserver pod должен иметь read access к bucket:
# Webserver IRSA role
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:ListBucket"],
"Resource": [
"arn:aws:s3:::airflow-prod-logs",
"arn:aws:s3:::airflow-prod-logs/*"
]
}
Worker pods — write access (PutObject). Webserver — read only.
Latency UI logs
S3 GET ~100-500ms (cold). При больших logs (100MB) — потенциально 5-10s. UI таймаут default 30s.
Mitigation:
- Включить S3 Transfer Acceleration для cross-region.
- Кешировать в CloudFront / CDN перед webserver.
- Splitting long logs на chunks с pagination в UI (2.10+).
Frontend link для drill-down
UI часто хочет глубинный link на «full logs in Kibana / Splunk / CloudWatch». Через [elasticsearch] frontend (для ES) или custom:
[elasticsearch]
frontend = https://kibana.example.com/app/discover#/?_g=()&_a=(query:(language:kuery,query:'log_id:"{log_id}"'))
В UI Airflow появится “View logs in Kibana” button с pre-filled query.
Log retention strategies
S3 lifecycle policy
lifecycle_rule {
id = "logs-archive"
status = "Enabled"
transition {
days = 30
storage_class = "STANDARD_IA" # cheaper after 30d
}
transition {
days = 90
storage_class = "GLACIER" # archive after 90d
}
expiration {
days = 365 # delete after 1 year
}
}
GCS lifecycle
gsutil lifecycle set lifecycle.json gs://my-bucket
{
"lifecycle": {
"rule": [
{"action": {"type": "SetStorageClass", "storageClass": "NEARLINE"}, "condition": {"age": 30}},
{"action": {"type": "SetStorageClass", "storageClass": "ARCHIVE"}, "condition": {"age": 365}},
{"action": {"type": "Delete"}, "condition": {"age": 1825}}
]
}
}
ES ILM
{
"policy": {
"phases": {
"hot": { "actions": { "rollover": { "max_age": "7d" } } },
"warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 } } },
"delete": { "min_age": "90d", "actions": { "delete": {} } }
}
}
}
Production gotchas
1. delete_local_logs_after_upload = False default
Local files остаются на worker disk forever без cleanup. Worker fills up за дни.
Fix: delete_local_logs_after_upload = True (2.7+). Или daily cron find /opt/airflow/logs -mtime +1 -delete.
2. Remote upload async — race condition с UI read
Когда TI complete, scheduler возвращает success до upload finished. UI пытается читать с S3 → 404. Через ~5-30s upload завершается.
Симптом: «логов нет» сразу после задачи, через минуту появляются.
Mitigation: UI fallback на worker RPC. Или upload в hot path до return success (slow).
3. Worker died — последний batch logs lost
Local files на worker не успели upload. Worker pod killed → local fs gone.
Fix:
- Persistent volume для local logs (но это против ephemeral nature).
sidecarкоторый continuously upload (real-time tail).- Использовать ES streaming (logs идут в realtime).
4. try_N.log naming — confusing
Каждый retry создаёт новый file. UI default показывает latest try. Для investigation предыдущей попытки — manual select try_number.
5. Cross-region S3 — high latency
UI в Europe, bucket в us-east-1 → каждый log read 200-500ms.
Fix: bucket в той же region что webserver. Multi-region requires replication (cost).
6. ES JSON parsing breakage
Если task pусит non-JSON в stdout (например, через print() без json.dumps) — log entries become invalid в ES. Index mapping breaks, alerts «cannot parse».
Fix: strict logging — ВСЕГДА через logger, никаких print() в production code. ES mapping ignore_malformed: true как safety net.