Что такое batch processing
Batch processing — это обработка большого блока данных за один проход. Классические задачи Data Engineer:
- Загрузить миллион строк из CSV/Parquet.
- Преобразовать каждую строку (parse дат, нормализация полей).
- Сделать дедупликацию.
- Сгруппировать по ключу и агрегировать (count, sum, avg).
- Записать результат в БД или файл.
Batch отличается от streaming тем, что мы видим все данные сразу и можем выбирать любую структуру под задачу. В streaming данные текут — нужны другие подходы.
В batch правильный выбор структуры данных — главный фактор скорости. Один и тот же ETL с правильной структурой работает 10 секунд, с неправильной — час. Это не «оптимизация по чуть-чуть» — это разница между приемлемой обработкой и серверным таймаутом.
Базовое правило: одна задача — одна структура
Каждая операция в ETL имеет «свою» структуру:
- Линейный проход —
list. Просто перебираем, преобразуем. - Дедупликация —
set. Уникальные значения за O(1) на проверку. - Агрегация по ключу —
dict. Например,dict[user_id] += amount. - Top-K —
heapq(min-heap размера K). - Sorted iteration —
sorted()илиSortedList.
Часто в одном ETL встречается несколько операций. Каждая работает со «своей» структурой. Это нормально и правильно.
Каждая операция оптимальна для своей структуры. Выбор — критичное решение.
Memory considerations
При обработке миллионов строк память становится главным ограничением. Несколько важных фактов:
Размер записи. Один dict в Python (с парой полей) занимает 200-400 байт. Миллион таких — 200-400 МБ. Если обрабатываем 100 миллионов — 20-40 ГБ, что обычно невыполнимо.
import sys
row = {"user_id": 12345, "event": "click", "timestamp": 1700000000}
print(sys.getsizeof(row)) # ~240 байт на dict сам по себе
# + значения: int (28 байт), str (50 байт) — итого 350+ байт на «запись»
Tuple vs dict. Tuple легче примерно в 3 раза:
row_tuple = (12345, "click", 1700000000)
print(sys.getsizeof(row_tuple)) # ~80 байт
На миллионе записей экономия 200-300 МБ. Это много.
namedtuple, dataclass(slots=True). Структурированный доступ с экономией памяти:
from dataclasses import dataclass
@dataclass(slots=True)
class Event:
user_id: int
event: str
timestamp: int
e = Event(12345, "click", 1700000000)
# почти как tuple по памяти + читаемые поля
Без slots=True dataclass имеет свой __dict__ и весит как обычный класс (400+ байт). С slots=True — компактный layout, около 100 байт.
Генераторы vs списки
Главная техника экономии памяти — не материализовать всё в список, а использовать генератор:
# плохо: грузим весь файл в память
with open("events.csv") as f:
rows = f.readlines() # 1 ГБ данных = 1 ГБ RAM
for row in rows:
process(row)
# хорошо: ленивая итерация
with open("events.csv") as f:
for row in f: # читает по строке, в RAM 1 строка
process(row)
Тот же приём с генератор-выражениями:
# плохо: материализованный список
filtered = [parse(row) for row in source if row.valid]
total = sum(r.amount for r in filtered)
# хорошо: цепочка генераторов
filtered = (parse(row) for row in source if row.valid)
total = sum(r.amount for r in filtered)
Разница в памяти: первый вариант хранит весь filtered в RAM. Второй — обрабатывает по одному элементу. На миллионах записей разница в гигабайтах.
Правило: в ETL заменяйте list comprehensions на генератор-выражения везде, где результат используется ровно один раз. Это автоматически экономит память без потери читабельности.
Chunked processing
Если файл огромен (50 ГБ), даже генератор может быть медленным из-за overhead на отдельную обработку каждой строки. Решение — обрабатывать чанками:
def chunks(iterator, size):
"""Делим итератор на блоки фиксированного размера."""
chunk = []
for item in iterator:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
# применение
with open("huge.csv") as f:
for chunk in chunks(f, 10_000):
# обрабатываем 10000 строк за раз
process_batch(chunk)
Это золотая середина: одиночные строки — медленно (много overhead), весь файл целиком — много памяти. Чанк 1000-10000 — обычно оптимально.
В pandas есть pd.read_csv(..., chunksize=10000). В polars/duckdb — встроенные potoki. Принцип тот же.
Дедупликация на больших данных
Простая дедупликация:
seen = set()
result = []
for record in records:
key = record.event_id
if key not in seen:
seen.add(key)
result.append(record)
Это O(n) по времени, O(n) по памяти (на seen). На миллиарде уникальных event_id это десятки ГБ только на set.
Альтернативы:
- Sorted file dedup. Отсортировать файл (например, через
sortUnix утилиту, O(n log n) с временной памятью на диске), потом убрать соседние дубликаты за O(n) и O(1) RAM. - External hash. Использовать БД (SQLite, RocksDB) как persistent set.
- Bloom filter. Вероятностная структура, O(m) bits с false positives. См. следующий урок.
Выбор зависит от объёма данных и допустимости приближения.
Агрегация по ключу
Классическая ETL-задача: «сколько событий у каждого user_id».
from collections import Counter
count_by_user = Counter()
for event in events:
count_by_user[event.user_id] += 1
# top-10 пользователей
print(count_by_user.most_common(10))
Counter — это специализированный dict[hashable, int]. Каждый инкремент O(1). На миллионе записей с 100 тысяч уникальных user_id это работает за миллисекунды.
Альтернатива через defaultdict:
from collections import defaultdict
count = defaultdict(int)
for event in events:
count[event.user_id] += 1
Эквивалентно. Counter добавляет удобные методы (most_common, elements, +/- между Counters).
Сравнение производительности
Один и тот же ETL: 10 миллионов строк, нужно посчитать уникальных пользователей по конкретному событию.
# вариант 1: list + in (наивный)
def slow_unique(events, target_event):
users = []
for e in events:
if e.event == target_event and e.user_id not in users:
users.append(e.user_id)
return len(users)
# вариант 2: set
def fast_unique(events, target_event):
users = set()
for e in events:
if e.event == target_event:
users.add(e.user_id)
return len(users)
Вариант 1 — O(n * m), где m — количество уникальных. На 10 млн событий и 100 тыс уникальных = 10^12 операций — часы. Вариант 2 — O(n). На тех же данных — секунды.
Разница не в 10x, не в 100x — в миллион раз. Это разница между «работает» и «не работает».
Попробуй сам
Скрипт, генерирующий «события» и считающий уникальных пользователей разными способами:
import random
import time
n = 1_000_000
events = [(random.randint(1, 50_000), "click") for _ in range(n)]
# через set
t0 = time.time()
users = set()
for uid, _ in events:
users.add(uid)
t1 = time.time()
print(f"set: {len(users)} unique in {t1-t0:.3f} s")
# через dict (как Counter)
t0 = time.time()
counts = {}
for uid, _ in events:
counts[uid] = counts.get(uid, 0) + 1
t1 = time.time()
print(f"dict: {len(counts)} unique in {t1-t0:.3f} s")
# через list (медленно)
t0 = time.time()
users = []
for uid, _ in events[:50_000]: # на 50k чтобы не ждать час
if uid not in users:
users.append(uid)
t1 = time.time()
print(f"list (50k only): {len(users)} unique in {t1-t0:.3f} s")
Запустите, увидите разницу в 1000-10000x между set и list.
Протокол итераторов в Python: как работает for-loop под капотом __slots__ и dataclass: компактные объекты вместо dict