Structured logging: JSON output, extra={}, correlation IDs
Простой text format %(asctime)s [%(levelname)s] %(name)s: %(message)s (урок 01) хорош для local development и tail -f. Production — другая история: logs aggregation (Datadog, Grafana Loki, Splunk, Elasticsearch, ClickHouse system.text_log) хочет structured records — каждое поле queryable, indexable, filterable. Pragmatic-DEEP rule: JSON formatter + extra={} + correlation IDs покрывают 90% production observability needs без third-party libs (structlog упомянем в уроке 06).
В этом уроке:
- Зачем structured logs — text grep vs SQL queries по logs.
- Custom Formatter subclass —
class JsonFormatter(logging.Formatter). extra={}kwarg — injection attributes на LogRecord.- LogRecord attribute introspection — cross-link M07 урок 04 dataclass.
- Correlation IDs — request-scoped UUID propagation.
- Recipe — production-ready JSON logger.
- Cross-course → ClickHouse
system.text_log. - Cross-course → Spark
metrics-prometheus.
Зачем structured logs
Text logs — это flat strings. Чтобы найти все ERROR от user_id=42 за последний час, нужен grep + sed + awk:
grep ERROR app.log | grep 'user_id=42' | awk '$1 > "2026-04-29T11:00:00"'
Хрупко: regex ломается если log line чуть изменился; user_id=42 matches user_id=42100; даты parsing — вручную.
Structured (JSON) альтернатива:
{"time": "2026-04-29T12:00:00", "level": "ERROR", "logger": "app.api", "message": "request failed", "user_id": 42, "request_id": "abc-123"}
{"time": "2026-04-29T12:00:01", "level": "INFO", "logger": "app.api", "message": "request ok", "user_id": 7, "request_id": "abc-124"}
Загружаем в ClickHouse / Loki / Elastic — query как table:
SELECT * FROM logs
WHERE level = 'ERROR'
AND user_id = 42
AND time > now() - INTERVAL 1 HOUR
Production rule: local dev — text format (читабельно для человека); production — JSON (читабельно для machines).
Custom Formatter subclass — class JsonFormatter
logging.Formatter имеет format(record: LogRecord) -> str метод. Subclass переопределяет:
import json
import logging
from datetime import datetime
class JsonFormatter(logging.Formatter):
"""Render LogRecord as JSON line."""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, object] = {
'time': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(), # applies %s formatting
'pathname': record.pathname,
'lineno': record.lineno,
'funcName': record.funcName,
}
# Если есть exception info — capture traceback (урок 05 deep dive)
if record.exc_info:
payload['exc_info'] = self.formatException(record.exc_info)
return json.dumps(payload)
# Wire it up
import sys
logger = logging.getLogger('app.api')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
logger.info('request received', extra={'user_id': 42})
# {"time": "2026-04-29T12:00:00", "level": "INFO", "logger": "app.api", "message": "request received", ...}
Ключевые методы LogRecord (используются Formatter):
record.getMessage()— applies%sformatting кrecord.msg + record.args(урок 01 lazy formatting Pitfall 37 здесь!).record.exc_info—(type, value, traceback)еслиexc_info=True(урок 05).self.formatException(exc_info)— Formatter helper, returns multi-line traceback string.
Cite docs.python.org/3/library/logging.html#formatter-objects + docs.python.org/3/library/logging.html#logrecord-attributes.
extra={} kwarg — injection attributes на LogRecord
logger.info('msg', extra={'k': v}) — каждый key из extra injected как attribute на LogRecord. Внутри Formatter доступен через record.k.
logger.info('request received', extra={'user_id': 42, 'request_id': 'abc-123'})
# Внутри JsonFormatter.format(record):
# record.user_id == 42
# record.request_id == 'abc-123'
Важно: extra keys должны быть unique — не conflict с standard LogRecord attributes (message, level, name, time, etc.). Иначе KeyError: "Attempt to overwrite 'message' in LogRecord".
Pattern: capture extra dynamically в Formatter:
# Standard LogRecord attribute set — known to be NOT user-injected
_STANDARD_ATTRS = {
'name', 'msg', 'args', 'created', 'filename', 'funcName',
'levelname', 'levelno', 'lineno', 'module', 'msecs', 'message',
'pathname', 'process', 'processName', 'relativeCreated',
'stack_info', 'thread', 'threadName', 'taskName', 'exc_info', 'exc_text',
}
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, object] = {
'time': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
}
# Capture все extra= keys dynamically
for key, value in record.__dict__.items():
if key not in _STANDARD_ATTRS:
payload[key] = value
return json.dumps(payload, default=str) # default=str fallback для datetime / Decimal
Cross-link M07 урок 04 (dataclass introspection): Custom Formatter inspects LogRecord like dataclass fields — record.__dict__ — это dict mapping (для regular Python class), keys = injected attributes. Pattern matches dataclasses.fields(obj) introspection — same idea (read structured fields из object).
LogRecord — список полезных attributes
Когда ваш Formatter rendering LogRecord, к каким полям обращаться? Standard set (subset; full в docs.python.org/3/library/logging.html#logrecord-attributes):
| Attribute | Type | Description |
|---|---|---|
record.name | str | Logger name ('app.db.query') |
record.levelname | str | 'INFO', 'WARNING', etc. |
record.levelno | int | 10/20/30/40/50 |
record.pathname | str | Full file path call site |
record.filename | str | Basename 'query.py' |
record.module | str | Module name 'query' |
record.funcName | str | Function name caller |
record.lineno | int | Line number caller |
record.created | float | Unix timestamp (time.time()) |
record.msecs | float | Millisecond fractional |
record.threadName | str | 'MainThread' |
record.processName | str | 'MainProcess' |
record.exc_info | tuple | None | (type, value, tb) if exc_info=True |
record.getMessage() | str (method) | Apply %s formatting (lazy) |
Production-grade JSON formatter обычно включает: time / level / logger / message / pathname (или module+lineno) / exc_info (если есть) + все custom extra= fields.
Correlation IDs — request-scoped UUID propagation
В distributed system один user request касается нескольких сервисов / threads / async tasks. Без correlation ID невозможно сказать “вот эти 47 log lines — один request”.
Pattern: каждый incoming request получает UUID, который propagated к каждому log call внутри request lifecycle.
import uuid
import logging
logger = logging.getLogger('app.api')
def handle_request(req: dict) -> dict:
request_id = str(uuid.uuid4()) # e.g., 'abc-123-...-xyz'
logger.info('request received', extra={'request_id': request_id, 'path': req['path']})
try:
result = process_business_logic(req, request_id=request_id)
logger.info('request ok', extra={'request_id': request_id, 'duration_ms': 42})
return result
except Exception:
logger.exception('request failed', extra={'request_id': request_id})
raise
def process_business_logic(req: dict, *, request_id: str) -> dict:
logger.info('querying db', extra={'request_id': request_id, 'sql_size': 200})
...
В JSON output три события связаны одним request_id:
{"time": "...", "level": "INFO", "message": "request received", "request_id": "abc-123", "path": "/users"}
{"time": "...", "level": "INFO", "message": "querying db", "request_id": "abc-123", "sql_size": 200}
{"time": "...", "level": "INFO", "message": "request ok", "request_id": "abc-123", "duration_ms": 42}
Production стек:
contextvars.ContextVar(PEP 567 — Python 3.7+) propagates request_id через async tasks без manual passing.logging.LoggerAdapterили customFilterinjects request_id automatically в каждый record.- Web frameworks (FastAPI / Starlette / Django) middleware устанавливает request_id из incoming
X-Request-IDheader.
Cite PEP 567 — contextvars.
Recipe — production-ready JSON logger
import json
import logging
import sys
from datetime import datetime
_STANDARD_ATTRS = {
'name', 'msg', 'args', 'created', 'filename', 'funcName',
'levelname', 'levelno', 'lineno', 'module', 'msecs', 'message',
'pathname', 'process', 'processName', 'relativeCreated',
'stack_info', 'thread', 'threadName', 'taskName', 'exc_info', 'exc_text',
}
class JsonFormatter(logging.Formatter):
"""Render LogRecord as one-line JSON. Captures extra= attributes."""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, object] = {
'time': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'lineno': record.lineno,
}
if record.exc_info:
payload['exc_info'] = self.formatException(record.exc_info)
for key, value in record.__dict__.items():
if key not in _STANDARD_ATTRS:
payload[key] = value
return json.dumps(payload, default=str)
def configure_json_logging(level: int = logging.INFO) -> None:
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(JsonFormatter())
root = logging.getLogger()
root.handlers.clear() # equivalent force=True
root.addHandler(handler)
root.setLevel(level)
# Use
configure_json_logging()
logger = logging.getLogger('app.api')
logger.info('app started', extra={'version': '1.2.3'})
try:
1 / 0
except ZeroDivisionError:
logger.exception('division failed', extra={'request_id': 'abc-123'})
Output:
{"time": "2026-04-29T12:00:00", "level": "INFO", "logger": "app.api", "message": "app started", "module": "main", "lineno": 30, "version": "1.2.3"}
{"time": "2026-04-29T12:00:01", "level": "ERROR", "logger": "app.api", "message": "division failed", "module": "main", "lineno": 35, "exc_info": "Traceback (most recent call last):\n...\nZeroDivisionError: division by zero", "request_id": "abc-123"}
Cross-course → ClickHouse system.text_log + system.query_log
ClickHouse 14/01 — system tables exposes system.text_log (structured logs of CH server itself) и system.query_log (every executed query). Каждая table — structured columnar parallel JSON formatter idea.
SELECT level, query_id, message
FROM system.text_log
WHERE event_time > now() - INTERVAL 5 MINUTE
AND level IN ('Error', 'Warning')
ORDER BY event_time DESC;
Bridge insight: Python application emit’ит JSONL → ClickHouse ingest через INSERT FORMAT JSONEachRow (M09 урок 03 carrying) → text_log-style structured query. Same architectural shape — structured fields queryable as columns. Production pipeline pattern: app emits JSON logs → Vector / Fluentd / Filebeat collector → ClickHouse text_log table → Grafana/Metabase queries.
Cross-course → Spark metrics-prometheus
Spark 08/03 — Metrics + Prometheus configures Spark MetricSink → Prometheus scrape endpoint. Prometheus expects key-value metric format — same shape что наш JSON formatter extra={}:
spark_executor_jvm_memory_used{instance="...", executor="0"} 1234567
vs Python equivalent JSON log:
{"time": "...", "level": "INFO", "metric": "jvm_memory_used", "instance": "...", "executor": "0", "value": 1234567}
Bridge insight: structured key/value emission (Prometheus metrics, Loki labels, ClickHouse JSON columns) — same conceptual shape across logs / metrics / Spark MetricSinks. Урок 04 углубит 3 pillars (logs/metrics/traces) и unified backend story.
Kafka consumer — JSON logs как поток событий Debezium → Prometheus — production метрики коннектораЧто в следующем уроке
Урок 03 — logging.config.dictConfig + JSON-config recipe + RotatingFileHandler real disk (Run-on-Your-Machine #1). Hosts code-challenge py-m11-03-code-1 (Pattern 1 — JSON dictConfig parse via stdlib json.loads).
Pragmatic-DEEP принцип: не deep-dive’ем в
structlogC-extension performance internals или OTLP Protobuf framing. Stdliblogging+ custom JSON Formatter +extra={}+ correlation IDs покрывают 80%+ production observability needs.structlog/loguru(урок 06) — incremental ergonomic improvements, не fundamentally different model.