Почему print не годится
В первом скрипте вы написали print("processing row 12345") и почувствовали себя хорошо. На рабочем ноутбуке всё видно, ETL отрабатывает, всё ок. А потом этот скрипт встал в Airflow, начал крутиться ночью каждые 15 минут, иногда падает — и оказывается, что у print есть проблемы, которые в production невозможно терпеть:
- Нет уровней. «Скачали 1000 строк» и «БД недоступна» выглядят одинаково — обычным текстом в stdout. Поиск ошибок в гигабайтных логах превращается в
grep | awk | sad. - Не отключается. Включить детальный лог на одной БД в одной среде и не трогать другие — невозможно без правки кода.
- Не структурировано. Логи в строки — это не данные. Запросы вроде «дай все ETL-джобы, которые упали с http 503 в последний час» — невозможно построить.
- Нет timestamp и контекста. Где запустилось, кто запустил, в каком процессе, в каком потоке —
printне знает. А в production это первый вопрос на инциденте. - Идёт в stdout вперемешку с выводом программы. Если ваш скрипт пишет в stdout полезные данные (например, JSON для дальнейшего pipe) —
print-логи испортят выход.
Правильный инструмент — модуль logging из stdlib и его более удобная надстройка structlog. Зачем junior DE это понимать: ваш код будет крутиться в Airflow/Kubernetes, и единственный канал связи между вами и production — это лог. Если он неинформативен — отладка превращается в шаманство.
logging из stdlib: минимальный комплект
В Python всегда есть встроенный logging. Базовое использование — три строки:
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger(__name__)
log.info("starting ETL")
log.warning("API returned 429, will retry")
log.error("failed to write to DB: connection refused")
Вывод:
2026-05-13 14:30:00,123 [INFO] starting ETL
2026-05-13 14:30:05,456 [WARNING] API returned 429, will retry
2026-05-13 14:30:10,789 [ERROR] failed to write to DB: connection refused
Уже на этом уровне — есть timestamp, есть уровень, есть фильтрация (только INFO+ показывается). Дальше можно настроить вывод в файл, в systemd journal, в Loki/Splunk — без правки бизнес-кода.
Уровни логов
Стандартные уровни от самого болтливого к самому критичному:
| Уровень | Когда писать |
|---|---|
DEBUG | Очень подробно, ниткой сшивающий шаги. По умолчанию не отображается. |
INFO | Нормальный ход дел: «начали ETL», «обработали 1000 строк». Дефолт в production. |
WARNING | Что-то не так, но программа продолжает: «API вернул 429», «retry succeeded». |
ERROR | Что-то не удалось, но программа не упала: «не смогли отправить email». |
CRITICAL | Программа дальше не работает. Очень редко. |
Уровни — не философия, а технический фильтр. Установили level=INFO — DEBUG-сообщения не пишутся. В dev можно поставить DEBUG, в production — INFO. Это первый и самый дешёвый инструмент управления объёмом логов.
Идиома: logger = logging.getLogger(__name__) в каждом модуле
В каждом Python-модуле, где есть логи, пишите эту строчку сверху, после импортов:
import logging
log = logging.getLogger(__name__)
__name__ — это полное имя модуля, например mypackage.etl.fetch. Logger с этим именем создаётся один на модуль, и через него можно по-разному настраивать уровни. Хотите больше деталей только из mypackage.etl.fetch — настройте уровень для этого имени, остальной код продолжит писать INFO. Без правки бизнес-кода вообще.
getLogger(__name__) строит иерархию через точки: mypackage.etl.fetch — потомок mypackage.etl, тот — потомок mypackage. Настройка верхнего уровня применяется и к потомкам, если у них своя не задана. Это позволяет одной командой утихомирить целый поддиректорий.
Конфигурация через dictConfig
basicConfig хорош для простых скриптов. В реальном проекте используют dictConfig — он принимает словарь с полной конфигурацией handlers, formatters, loggers:
import logging.config
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"},
},
"handlers": {
"console": {"class": "logging.StreamHandler", "formatter": "default", "level": "INFO"},
},
"loggers": {
"": {"handlers": ["console"], "level": "INFO"}, # root
"httpx": {"level": "WARNING"}, # тише болтливую библиотеку
},
}
logging.config.dictConfig(LOGGING)
disable_existing_loggers: False — обязательно. Без этого logger’ы, созданные до dictConfig (например, в импортах сверху файла), перестанут работать.
Где logging неудобен и зачем structlog
Стандартный logging сам по себе хороший, но у него есть одна слабость: сообщения — это строки. Если хочется писать «обработали 1000 строк, заняло 2.3 секунды, источник CSV /data/orders.csv», приходится склеивать вручную: log.info(f"processed {n} rows in {duration:.1f}s from {path}"). А потом, чтобы найти все такие сообщения и посчитать сумму строк, нужно регуляркой парсить обратно из лога.
Современная индустрия пришла к structured logging — каждое сообщение это не строка, а словарь полей: {event: "rows_processed", n: 1000, duration: 2.3, path: "/data/orders.csv"}. И сразу пишется в JSON в stdout, который потом ест Loki/Elasticsearch/CloudWatch и позволяет фильтровать SQL-подобными запросами.
Самая популярная библиотека для этого в Python —
logging, а делает поверх него удобный API.
import structlog
log = structlog.get_logger()
log.info("rows_processed", n=1000, duration=2.3, path="/data/orders.csv")
В development при настройке по умолчанию вы увидите цветную человеко-читаемую строку, в production — JSON.
Базовая настройка structlog для production
import logging
import structlog
# 1. Настроить stdlib logging как backend
logging.basicConfig(level=logging.INFO, format="%(message)s")
# 2. Настроить structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # подхватывать «глобальный контекст»
structlog.processors.add_log_level, # добавлять level в каждый event
structlog.processors.TimeStamper(fmt="iso", utc=True), # iso-timestamp в UTC
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(), # выводить как JSON
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
log.info("etl_started", job="orders_import", run_id="2026-05-13T14:30")
Вывод (JSON, одна строка на событие):
{"event": "etl_started", "job": "orders_import", "run_id": "2026-05-13T14:30", "level": "info", "timestamp": "2026-05-13T14:30:00.123456Z"}
Что важно. processors — это цепочка функций, через которые проходит каждое событие. Каждый processor получает событие в виде словаря и возвращает изменённый словарь. Финальный — обязательно Renderer, он превращает словарь в строку для вывода.
Каждый processor преобразует словарь события. Renderer — последний шаг, он отдаёт строку, которую дальше печатает stdlib logging.
Можно вставлять свои processor’ы — например, добавить hostname, отредактировать секретные поля, отфильтровать события по уровню. JSONRenderer всегда последний; для dev-окружения его заменяют на structlog.dev.ConsoleRenderer() (цветной, разноуровневый).
Контекст через bind
Один из главных приёмов — привязать контекст к логгеру, чтобы все следующие события автоматически получали эти поля:
log = structlog.get_logger()
log_job = log.bind(job="orders_import", run_id="2026-05-13T14:30")
log_job.info("etl_started")
log_job.info("rows_fetched", n=1000)
log_job.info("etl_finished", duration=2.3)
Каждое из трёх событий будет содержать job и run_id. Это позволяет в Loki/Splunk фильтровать {job="orders_import"} и видеть весь ход одного запуска.
Альтернативно — structlog.contextvars.bind_contextvars(...) — то же самое, но через ContextVars, что работает между await-ами в async-коде.
DE-кейс: ETL-функция с structlog
Собираем всё вместе. Типичная функция ETL должна логировать как минимум четыре события: старт, прогресс, конец, ошибку. Желательно с подсчётами и контекстом.
import structlog
import time
log = structlog.get_logger()
def run_etl(source: str, run_id: str) -> None:
job_log = log.bind(source=source, run_id=run_id)
job_log.info("etl_started")
start = time.perf_counter()
try:
rows = fetch(source)
job_log.info("rows_fetched", count=len(rows))
processed = transform(rows)
save(processed)
duration = time.perf_counter() - start
job_log.info("etl_finished", processed=len(processed), duration_s=round(duration, 3))
except Exception as exc:
duration = time.perf_counter() - start
job_log.exception("etl_failed", error_type=type(exc).__name__, duration_s=round(duration, 3))
raise
В Loki этот ETL будет легко находить и считать: count_over_time({event="etl_finished"}[1h]), sum by (source) (rate({event="etl_failed"}[5m])). Без структурного лога этого сделать нельзя.
log.exception() — отдельный метод, который автоматически вкладывает traceback в поле exception события. Это и есть смысл processors.format_exc_info в конфиге.
print против logging против structlog
| Когда | Чем |
|---|---|
| Однострочный CLI tool, скрипт-помощник | print() нормально |
| Простой ETL для одной задачи, отладка | stdlib logging |
| Production ETL, любой long-running процесс | structlog с JSON-выводом |
| Web-сервис, фоновые задачи, очереди | structlog обязательно |
Простое правило: если код будет жить в Airflow, Kubernetes, systemd, cron или Docker — это structlog. Если это python organize_my_files.py, который вы запустили вручную — хватит и print.
Подводные камни
Не оставляйте print в production-коде по привычке. Один забытый print где-нибудь в библиотеке загрязняет stdout всего pipeline. Code review должно ловить такое.
Не логируйте секреты. Токены API, пароли БД, личные данные пользователей — никогда не должны попадать в лог. Если такие поля приходят в события — фильтруйте на уровне processor.
Не злоупотребляйте DEBUG. Тысячи DEBUG-строк в продакшне = бесполезный мусор, который только тормозит парсер. Используйте DEBUG для конкретных модулей точечно, не глобально.
Не пишите в файл из самой программы. Идиоматично — писать в stdout, а перенаправление в файл/storage делает оркестратор (systemd, Docker, K8s, Airflow). Это правило
JSON-логи нечитаемы глазом. Это норма: они для машин. Для dev-окружения переключайте на ConsoleRenderer, для production — JSONRenderer. Управляйте через env-переменную.
Что мы получили
printдля production-кода не годится: нет уровней, нет контекста, не структурно, идёт в stdout вперемешку с данными.- stdlib
loggingдаёт уровни, иерархию черезgetLogger(__name__), конфиг черезdictConfig. structlog— стандарт production-Python: структурные события в JSON, контекст черезbind, цепочкаprocessors.- DE-применение: каждое событие ETL — это словарь с
event,run_id,count,durationи др. Запросы в Loki/Splunk строятся за минуту. - Правило 12-factor: писать в stdout, маршрутизация — забота окружения.
На этом модуль 4 закрывается. Вы освоили пять идиоматических инструментов Python-разработчика: генераторы, контекст-менеджеры, декораторы, pathlib и datetime-discipline. Шестой — логирование — формально не «идиома языка», но без него production-Python не существует.
В модуле 5 поговорим о системе типов и валидации данных через Pydantic — следующий обязательный кирпичик в DE-стеке.