Квоты и аудит-логирование
Аутентификация и авторизация защищают кластер от несанкционированного доступа. Но что если авторизованный клиент генерирует такой поток данных, что голодает все остальные сервисы? Или compliance требует знать, кто и когда пытался получить доступ к sensitive топикам? Для этого существуют квоты и аудит-логирование.
Зачем квоты: проблема noisy neighbor
В multi-tenant Kafka-кластере несколько команд и сервисов используют общую инфраструктуру. Один агрессивный producer может:
- Использовать 90% I/O bandwidth брокера.
- Заполнить буферы сети.
- Увеличить latency для всех остальных клиентов.
Квоты — это rate limiting на уровне брокера. Брокер принудительно замедляет клиента, превышающего лимит, добавляя задержку к ответам.
Типы квот
Kafka поддерживает три типа квот:
Scope квот: три уровня приоритета
Квоты могут быть применены на разных уровнях специфичности. Kafka использует самое специфичное правило:
Настройка квот через kafka-configs.sh
Квоты динамические: устанавливаются без перезапуска брокера и вступают в силу немедленно.
# Producer: 1 MB/s для пользователя alice
kafka-configs.sh --bootstrap-server localhost:9092 \
--command-config admin.properties \
--alter \
--add-config 'producer_byte_rate=1048576' \
--entity-type users --entity-name alice
# Consumer: 2 MB/s для client-id consumer-app
kafka-configs.sh --bootstrap-server localhost:9092 \
--command-config admin.properties \
--alter \
--add-config 'consumer_byte_rate=2097152' \
--entity-type clients --entity-name consumer-app
# Request percentage: 25% для пользователя bob
kafka-configs.sh --bootstrap-server localhost:9092 \
--command-config admin.properties \
--alter \
--add-config 'request_percentage=25' \
--entity-type users --entity-name bob
# Default: 5 MB/s produce для всех пользователей без специфичной квоты
kafka-configs.sh --bootstrap-server localhost:9092 \
--command-config admin.properties \
--alter \
--add-config 'producer_byte_rate=5242880' \
--entity-type users --entity-default
# Default: 10 MB/s consume для всех пользователей без специфичной квоты
kafka-configs.sh --bootstrap-server localhost:9092 \
--command-config admin.properties \
--alter \
--add-config 'consumer_byte_rate=10485760' \
--entity-type users --entity-default
# Проверить текущие квоты для пользователя alice
kafka-configs.sh --bootstrap-server localhost:9092 \
--describe --entity-type users --entity-name alice
# Удалить квоту
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter \
--delete-config 'producer_byte_rate' \
--entity-type users --entity-name alice
Квоты в Kafka динамические — изменяются через kafka-configs.sh без перезапуска брокера. Изменение вступает в силу немедленно. Это позволяет реагировать на инциденты в реальном времени: обнаружил noisy neighbor, установил квоту, проблема решена.
Механизм throttling: как брокер замедляет клиента
Когда клиент превышает квоту, брокер не обрывает соединение — он добавляет задержку к ответам:
Producer (50 MB/s)
Producer отправляет ProduceRequest. Скорость 50 MB/s, квота установлена 10 MB/s — превышение в 5 раз.Broker (quota check)
Брокер обрабатывает запрос, записывает данные. Вычисляет превышение квоты: клиент потребил X байт за последнее окно. Рассчитывает ThrottleTimeMs — время задержки, которое нивелирует превышение.Producer паузирует 450ms
Kafka client получает ThrottleTimeMs в response header. Автоматически (встроенная логика в Kafka client library) паузирует отправку на 450ms. Следующий ProduceRequest отправляется только через 450ms. Эффективная скорость снижается до квоты.JMX метрики для мониторинга throttling:
# Среднее время throttle для Produce запросов (должно быть близко к 0 в норме)
kafka.server:type=Request,name=ThrottleTimeMs,request=Produce
Attributes: Mean, 95thPercentile, 99thPercentile
# Quota violations count
kafka.server:type=ClientQuotaManager,name=ProduceThrottleTime,user=alice
Request percentage quota = 25 означает, что клиент может использовать не более 25% от одного I/O-потока. При num.io.threads=8 это 25% от 1 потока, не от суммарной ёмкости. Для реального ограничения пропускной способности используйте producer_byte_rate и consumer_byte_rate.
Connection и session квоты
Помимо byte rate и request quotas, Kafka поддерживает ограничения на уровне соединений:
# Максимальное количество соединений с одного IP
max.connections.per.ip=100
# Исключения для доверенных IP (через запятую)
max.connections.per.ip.overrides=10.0.1.0/24:200,10.0.2.100:500
# Максимальное общее количество соединений на брокер
max.connections=1000
# Rate limiter для новых соединений (соединений в секунду)
# Предотвращает connection storm DoS
max.connection.creation.rate=100
max.connection.creation.rate особенно важен: при массовом перезапуске клиентов (Kubernetes rolling restart) сотни соединений устанавливаются одновременно. Без этого ограничения — brief DoS на брокере.
Аудит-логирование
Authorizer логирует каждое решение о разрешении или отказе. Это создаёт полный audit trail: кто, когда, что пытался сделать и с каким результатом.
Конфигурация Log4j2 для authorizer log
<!-- log4j2.xml: отдельный appender для authorizer -->
<Appenders>
<RollingFile name="authorizerAppender"
fileName="/var/log/kafka/kafka-authorizer.log"
filePattern="/var/log/kafka/kafka-authorizer-%d{yyyy-MM-dd}.log.gz">
<PatternLayout pattern="%d{ISO8601} %p %c{1.} [%t] %m%n" />
<Policies>
<TimeBasedTriggeringPolicy interval="1" />
<SizeBasedTriggeringPolicy size="100MB" />
</Policies>
<DefaultRolloverStrategy max="30" />
</RollingFile>
</Appenders>
<Loggers>
<Logger name="kafka.authorizer.logger" level="INFO" additivity="false">
<AppenderRef ref="authorizerAppender" />
</Logger>
</Loggers>
В server.properties (Kafka 4.0 Log4j2 approach):
# Log4j2 config файл
log4j2.configuration.file=/etc/kafka/log4j2.xml
Формат записей audit log
Каждое авторизационное решение логируется в структурированном формате:
[2026-04-16 10:23:45,123] INFO Principal = User:producer-app is Allowed Operation = Write from host = 10.0.1.15 on resource = Topic:LITERAL:orders for request = PRODUCE from connection 127.0.0.1:9092-10.0.1.15:54321-0 (kafka.authorizer.logger)
[2026-04-16 10:24:01,456] INFO Principal = User:unknown-app is Denied Operation = Read from host = 10.0.2.99 on resource = Topic:LITERAL:payments for request = FETCH from connection 127.0.0.1:9092-10.0.2.99:33127-1 (kafka.authorizer.logger)
Ключевые поля каждой записи:
- Principal: аутентифицированный пользователь (
User:producer-app,User:ANONYMOUS) - Allowed/Denied: результат авторизации
- Operation: тип операции (Read, Write, Describe, etc.)
- host: IP-адрес клиента
- resource: тип ресурса + pattern type + имя ресурса
- request: тип Kafka API запроса (PRODUCE, FETCH, JOIN_GROUP, etc.)
Экспорт audit log в SIEM
Для compliance (PCI DSS, HIPAA, SOC 2) аудит-лог должен экспортироваться в централизованную систему. Стандартный подход — log forwarder:
# Filebeat / Fluentd конфигурация для сбора authorizer log
# и отправки в Elasticsearch/Splunk
inputs:
- type: log
paths:
- /var/log/kafka/kafka-authorizer.log
fields:
kafka_cluster: production
log_type: kafka_authorizer
# Для алертинга: фильтр DENIED операций
# Splunk: index=kafka source=kafka-authorizer "is Denied"
# Elasticsearch: {"query": {"match": {"message": "is Denied"}}}
Стратегии алертинга:
- Аномальное количество DENIED от одного principal за 5 минут → возможная брутфорс атака.
- DENIED от
User:ANONYMOUS(неаутентифицированный клиент) → попытка доступа без credentials. - ALLOWED доступ к sensitive топикам (payments, pii-data) в нерабочее время → insider threat.
Ключевые выводы
- Квоты защищают multi-tenant кластер от noisy neighbor. Три типа:
producer_byte_rate,consumer_byte_rate,request_percentage. - Scope priority: User+ClientID > User > ClientID > Default. Всегда устанавливайте разумный default.
- Throttling: брокер добавляет
ThrottleTimeMsк ответу. Kafka client автоматически паузирует. Нет обрыва соединения. - Квоты динамические — через
kafka-configs.shбез перезапуска. - Аудит-лог:
kafka.authorizer.loggerв Log4j2 →/var/log/kafka/kafka-authorizer.log. Структурированный формат с Principal, Operation, Resource, host, результат. - Для compliance: экспорт в SIEM, алертинг на DENIED операции и аномальный доступ.