Learning Platform
Глоссарий Troubleshooting
Урок 19.01 · 25 мин
Начальный
batch-processingetlmemorylistsetdictdata-engineering

Что такое batch processing

Batch processing — это обработка большого блока данных за один проход. Классические задачи Data Engineer:

  • Загрузить миллион строк из CSV/Parquet.
  • Преобразовать каждую строку (parse дат, нормализация полей).
  • Сделать дедупликацию.
  • Сгруппировать по ключу и агрегировать (count, sum, avg).
  • Записать результат в БД или файл.

Batch отличается от streaming тем, что мы видим все данные сразу и можем выбирать любую структуру под задачу. В streaming данные текут — нужны другие подходы.

В batch правильный выбор структуры данных — главный фактор скорости. Один и тот же ETL с правильной структурой работает 10 секунд, с неправильной — час. Это не «оптимизация по чуть-чуть» — это разница между приемлемой обработкой и серверным таймаутом.

Базовое правило: одна задача — одна структура

Каждая операция в ETL имеет «свою» структуру:

  • Линейный проходlist. Просто перебираем, преобразуем.
  • Дедупликацияset. Уникальные значения за O(1) на проверку.
  • Агрегация по ключуdict. Например, dict[user_id] += amount.
  • Top-Kheapq (min-heap размера K).
  • Sorted iterationsorted() или SortedList.

Часто в одном ETL встречается несколько операций. Каждая работает со «своей» структурой. Это нормально и правильно.

Структуры для типичных операций ETL

Каждая операция оптимальна для своей структуры. Выбор — критичное решение.

операцияитерацияперебрать миллион записей, преобразовать каждую
структураlist / generatorгенератор для lazy iteration без хранения всех
операцияdedupубрать дубликаты event_id
структураsetO(1) на проверку наличия
операцияcount per keyсколько событий у каждого user_id
структураCounter (dict)инкремент за O(1)
операцияlookup для joinuser_id -> user_name
структураdictO(1) поиск
операцияtop-K по value100 самых активных пользователей
структураheapq.nlargestmin-heap размера K, O(n log K)

Memory considerations

При обработке миллионов строк память становится главным ограничением. Несколько важных фактов:

Размер записи. Один dict в Python (с парой полей) занимает 200-400 байт. Миллион таких — 200-400 МБ. Если обрабатываем 100 миллионов — 20-40 ГБ, что обычно невыполнимо.

import sys

row = {"user_id": 12345, "event": "click", "timestamp": 1700000000}
print(sys.getsizeof(row))   # ~240 байт на dict сам по себе
# + значения: int (28 байт), str (50 байт) — итого 350+ байт на «запись»

Tuple vs dict. Tuple легче примерно в 3 раза:

row_tuple = (12345, "click", 1700000000)
print(sys.getsizeof(row_tuple))   # ~80 байт

На миллионе записей экономия 200-300 МБ. Это много.

namedtuple, dataclass(slots=True). Структурированный доступ с экономией памяти:

from dataclasses import dataclass

@dataclass(slots=True)
class Event:
    user_id: int
    event: str
    timestamp: int

e = Event(12345, "click", 1700000000)
# почти как tuple по памяти + читаемые поля

Без slots=True dataclass имеет свой __dict__ и весит как обычный класс (400+ байт). С slots=True — компактный layout, около 100 байт.

Генераторы vs списки

Главная техника экономии памяти — не материализовать всё в список, а использовать генератор:

# плохо: грузим весь файл в память
with open("events.csv") as f:
    rows = f.readlines()        # 1 ГБ данных = 1 ГБ RAM
    for row in rows:
        process(row)

# хорошо: ленивая итерация
with open("events.csv") as f:
    for row in f:               # читает по строке, в RAM 1 строка
        process(row)

Тот же приём с генератор-выражениями:

# плохо: материализованный список
filtered = [parse(row) for row in source if row.valid]
total = sum(r.amount for r in filtered)

# хорошо: цепочка генераторов
filtered = (parse(row) for row in source if row.valid)
total = sum(r.amount for r in filtered)

Разница в памяти: первый вариант хранит весь filtered в RAM. Второй — обрабатывает по одному элементу. На миллионах записей разница в гигабайтах.

TIP

Правило: в ETL заменяйте list comprehensions на генератор-выражения везде, где результат используется ровно один раз. Это автоматически экономит память без потери читабельности.

Chunked processing

Если файл огромен (50 ГБ), даже генератор может быть медленным из-за overhead на отдельную обработку каждой строки. Решение — обрабатывать чанками:

def chunks(iterator, size):
    """Делим итератор на блоки фиксированного размера."""
    chunk = []
    for item in iterator:
        chunk.append(item)
        if len(chunk) >= size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

# применение
with open("huge.csv") as f:
    for chunk in chunks(f, 10_000):
        # обрабатываем 10000 строк за раз
        process_batch(chunk)

Это золотая середина: одиночные строки — медленно (много overhead), весь файл целиком — много памяти. Чанк 1000-10000 — обычно оптимально.

В pandas есть pd.read_csv(..., chunksize=10000). В polars/duckdb — встроенные potoki. Принцип тот же.

Дедупликация на больших данных

Простая дедупликация:

seen = set()
result = []
for record in records:
    key = record.event_id
    if key not in seen:
        seen.add(key)
        result.append(record)

Это O(n) по времени, O(n) по памяти (на seen). На миллиарде уникальных event_id это десятки ГБ только на set.

Альтернативы:

  1. Sorted file dedup. Отсортировать файл (например, через sort Unix утилиту, O(n log n) с временной памятью на диске), потом убрать соседние дубликаты за O(n) и O(1) RAM.
  2. External hash. Использовать БД (SQLite, RocksDB) как persistent set.
  3. Bloom filter. Вероятностная структура, O(m) bits с false positives. См. следующий урок.

Выбор зависит от объёма данных и допустимости приближения.

Агрегация по ключу

Классическая ETL-задача: «сколько событий у каждого user_id».

from collections import Counter

count_by_user = Counter()
for event in events:
    count_by_user[event.user_id] += 1

# top-10 пользователей
print(count_by_user.most_common(10))

Counter — это специализированный dict[hashable, int]. Каждый инкремент O(1). На миллионе записей с 100 тысяч уникальных user_id это работает за миллисекунды.

Альтернатива через defaultdict:

from collections import defaultdict

count = defaultdict(int)
for event in events:
    count[event.user_id] += 1

Эквивалентно. Counter добавляет удобные методы (most_common, elements, +/- между Counters).

Сравнение производительности

Один и тот же ETL: 10 миллионов строк, нужно посчитать уникальных пользователей по конкретному событию.

# вариант 1: list + in (наивный)
def slow_unique(events, target_event):
    users = []
    for e in events:
        if e.event == target_event and e.user_id not in users:
            users.append(e.user_id)
    return len(users)

# вариант 2: set
def fast_unique(events, target_event):
    users = set()
    for e in events:
        if e.event == target_event:
            users.add(e.user_id)
    return len(users)

Вариант 1 — O(n * m), где m — количество уникальных. На 10 млн событий и 100 тыс уникальных = 10^12 операций — часы. Вариант 2 — O(n). На тех же данных — секунды.

Разница не в 10x, не в 100x — в миллион раз. Это разница между «работает» и «не работает».

Попробуй сам

Скрипт, генерирующий «события» и считающий уникальных пользователей разными способами:

import random
import time

n = 1_000_000
events = [(random.randint(1, 50_000), "click") for _ in range(n)]

# через set
t0 = time.time()
users = set()
for uid, _ in events:
    users.add(uid)
t1 = time.time()
print(f"set: {len(users)} unique in {t1-t0:.3f} s")

# через dict (как Counter)
t0 = time.time()
counts = {}
for uid, _ in events:
    counts[uid] = counts.get(uid, 0) + 1
t1 = time.time()
print(f"dict: {len(counts)} unique in {t1-t0:.3f} s")

# через list (медленно)
t0 = time.time()
users = []
for uid, _ in events[:50_000]:    # на 50k чтобы не ждать час
    if uid not in users:
        users.append(uid)
t1 = time.time()
print(f"list (50k only): {len(users)} unique in {t1-t0:.3f} s")

Запустите, увидите разницу в 1000-10000x между set и list.

Протокол итераторов в Python: как работает for-loop под капотом __slots__ и dataclass: компактные объекты вместо dict
Проверка знанийKnowledge check
У вас 100 миллионов строк лога. Нужно посчитать уникальных user_id и сделать top-100 по числу событий на пользователя. Сколько памяти и какие структуры вы выберете для каждой операции? Как избежать загрузки 100 миллионов dict-объектов в RAM?
ОтветAnswer
Не нужно держать 100 миллионов dict в RAM — это 30-40 ГБ. Правильный подход: 1) Потоковая итерация через генератор: open(file) с построчным чтением, обработка по одной записи. Парсим в tuple (60-80 байт), не в dict. 2) Counter (специализированный dict) для агрегации: count[user_id] += 1. На уникальных user_id (предположим 10 миллионов) Counter займёт ~700 МБ — приемлемо. 3) Top-100 через heapq.nlargest(100, count.items(), key=lambda x: x[1]) — O(n log K), не нужно сортировать весь counter. Альтернатива при огромном числе уникальных — chunked aggregation: разбить файл на 100 частей, по каждой Counter, потом объединить counters через c1 + c2 (поэлементное сложение). Это паттерн MapReduce.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. В чём разница между list comprehension и generator expression при обработке миллиона записей?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5