С чего начинается DE-боль
Junior получает свой первый таск: «вот файл events.csv на 12 ГБ, посчитай, сколько строк со страной RU». Junior пишет первое, что приходит в голову:
import csv
with open("events.csv") as f:
rows = list(csv.DictReader(f))
ru = [r for r in rows if r["country"] == "RU"]
print(len(ru))
Через минуту скрипт убивает ядро
str, у каждого свой заголовок и счётчик ссылок). Машина с 16 ГБ RAM такого не переживёт.
Решение этой проблемы — генераторы. Они позволяют обрабатывать данные по одной строке, не держа в памяти весь файл. Это, пожалуй, главный инструмент Data Engineer в Python: 70% production-кода в ETL — это цепочки генераторов.
Что такое генератор
В обычной функции return отдаёт значение и завершает выполнение. Генератор использует ключевое слово
yielddef counter():
print("start")
yield 1
print("between 1 and 2")
yield 2
print("between 2 and 3")
yield 3
print("end")
gen = counter() # ничего не напечатано — функция ещё не запускалась
print(next(gen)) # "start", потом 1
print(next(gen)) # "between 1 and 2", потом 2
print(next(gen)) # "between 2 and 3", потом 3
print(next(gen)) # "end", потом StopIteration
Обратите внимание на две вещи. Первая: counter() не выполняет тело функции. Она возвращает объект-генератор, который только при next() начинает шагать. Вторая: между вызовами next() функция реально стоит на паузе — её локальные переменные и место в коде сохранены.
Список считает всё сразу и держит в памяти. Генератор считает по одному и помнит только текущую позицию.
Важное свойство: генератор — одноразовый. После того как он выдал StopIteration, второй раз пройтись по нему нельзя — нужно создать новый. Это часто ловит джунов: «почему мой for row in rows работает только один раз?»
yield против return
Чтобы поставить точку — простая таблица отличий.
return | yield |
|---|---|
| Завершает функцию | Приостанавливает функцию |
| Один раз за вызов | Сколько угодно раз |
| Функция отдаёт значение | Функция отдаёт генератор, из которого можно получать значения |
| Локальные переменные забываются | Локальные переменные сохраняются между yield |
Когда в функции встречается хотя бы один yield, Python считает её генератор-функцией — её вызов уже не вычисляет тело, а только создаёт объект-генератор.
Как читать большой CSV
Теперь к нашей DE-задаче. Перепишем подсчёт RU-строк через генератор:
import csv
from collections.abc import Iterator
def read_csv(path: str) -> Iterator[dict[str, str]]:
with open(path, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
yield row
def only_country(rows: Iterator[dict[str, str]], country: str) -> Iterator[dict[str, str]]:
for row in rows:
if row["country"] == country:
yield row
count = sum(1 for _ in only_country(read_csv("events.csv"), "RU"))
print(count)
Что здесь происходит. read_csv — генератор, который выдаёт по одному словарю на строку. only_country — генератор, который читает входной генератор и пропускает только нужные. Финальный sum(1 for _ in ...) гоняет всю цепочку, увеличивая счётчик на каждый прошедший фильтр элемент.
В памяти в каждый момент времени — одна строка CSV. Файл хоть на 12 ГБ, хоть на терабайт — RAM-потребление одинаковое.
Iterator[T] из collections.abc — современный type hint для генераторов. Можно писать и Generator[T, None, None], если хочется быть точным, но Iterator[T] — общепринято и читабельнее.
Generator expression
Если генератор-функция нужна одноразово, есть синтаксис покороче — generator expression:
# generator expression — выражение в круглых скобках
ru_rows = (row for row in read_csv("events.csv") if row["country"] == "RU")
# для сравнения — list comprehension в квадратных скобках, грузит всё в память
ru_list = [row for row in read_csv("events.csv") if row["country"] == "RU"]
Разница в скобках — но смысловая пропасть. Первое — ленивый итератор, второе — материализованный список.
Generator expression можно передавать в функции напрямую, без лишних круглых скобок:
total = sum(int(row["amount"]) for row in read_csv("orders.csv"))
Это идиоматично — пишется как scan, читается как scan, работает как scan.
Пайплайны через композицию
Главная сила генераторов — их можно складывать в цепочки, и каждая цепочка не создаёт промежуточных списков.
Каждый уровень — отдельный генератор. Данные текут по одной строке слева направо, ничего не материализуется.
Каждый блок — отдельная функция-генератор. Все вместе они описывают поток обработки, но пока никто не позвал sum, ни одна строка не прочитана. Это называется
Дополнительный плюс: каждую функцию-кирпич можно протестировать в изоляции, дав ей небольшой список вместо файла.
itertools — стандартная аптечка
В stdlib есть модуль itertools, в котором лежат самые полезные операции над итераторами. DE-минимум, который надо знать наизусть:
import itertools
# islice — взять кусок [start:stop:step] от любого итератора
first_10 = list(itertools.islice(read_csv("events.csv"), 10))
# chain — склеить несколько итераторов в один
all_rows = itertools.chain(
read_csv("events_2025.csv"),
read_csv("events_2026.csv"),
)
# takewhile — брать, пока выполняется условие; потом остановиться
recent = itertools.takewhile(
lambda r: r["date"] >= "2026-01-01",
sorted_rows,
)
# accumulate — нарастающий итог (cumulative sum по умолчанию)
running_total = list(itertools.accumulate([10, 20, 30, 40])) # [10, 30, 60, 100]
# groupby — группирует подряд идущие одинаковые элементы
# ВАЖНО: данные должны быть отсортированы по ключу группировки
for country, group in itertools.groupby(sorted_rows, key=lambda r: r["country"]):
print(country, sum(1 for _ in group))
Главное про groupby: он группирует только подряд идущие одинаковые элементы. Если данные не отсортированы — получите столько групп, сколько переключений ключа в потоке. Это полезно, если данные уже отсортированы (типичный случай — выгрузка из БД с ORDER BY), и опасно, если нет.
islice особенно полезен в дебаге: можно посмотреть первые 5 строк потока, не запуская всю обработку:
for row in itertools.islice(pipeline, 5):
print(row)
Передача состояния через send
В yield можно не только отдавать, но и принимать значения. Это редкая в DE возможность, но знать о ней полезно — иногда встретится в legacy-коде.
def echo():
while True:
received = yield
print(f"got: {received}")
gen = echo()
next(gen) # первый next запускает генератор до первого yield
gen.send("hello") # печатает "got: hello"
gen.send("world") # печатает "got: world"
Это превращает генератор в корутину — объект, в который снаружи закидывают данные, а он что-то с ними делает. Современный async/await вырос ровно из этой идеи, но для DE-задач 99% времени достаточно классических «однонаправленных» генераторов с yield value. Закрывать тему не будем; если встретите send в чужом коде — теперь знаете, что это.
Идиоматичные паттерны
Несколько штук, которые в DE-коде вы будете видеть постоянно.
Batched-итератор. Часто нужно вставлять в БД пачками по N штук, а не по одной — bulk-insert на порядки быстрее. В Python 3.12+ для этого есть itertools.batched:
import itertools
def insert_batches(rows, batch_size=1000):
for batch in itertools.batched(rows, batch_size):
# batch — кортеж из batch_size элементов (последний может быть короче)
db.bulk_insert(list(batch))
Yield from. Если вы внутри генератора хотите «выпустить наружу» всё содержимое другого итератора — пишите yield from:
def all_events(paths):
for path in paths:
yield from read_csv(path)
Это эквивалентно for row in read_csv(path): yield row, но короче и быстрее.
Закрытие ресурсов. Если в генератор-функции есть with open(...), файл закроется правильно даже если внешний код перестанет потреблять генератор раньше времени. Python вызовет close() на генераторе, что породит GeneratorExit внутри, и with корректно закроет файл. Это безопаснее, чем самому передавать открытый файл.
Граничные случаи и подвохи
Есть три момента, на которых обжигаются почти все.
Генератор одноразовый. После полного обхода он пустой. Если попытаться итерировать второй раз, цикл просто ничего не сделает.
gen = (x*2 for x in range(3))
list(gen) # [0, 2, 4]
list(gen) # [] — генератор уже исчерпан
Решение: если хотите проходить дважды — либо материализуйте в список (list(gen)), либо оборачивайте создание генератора в функцию.
Нет len(). У генератора нельзя спросить длину, не пройдя его полностью. len(gen) бросит TypeError. Если нужен размер — это сигнал, что обрабатываете в памяти, и стоит подумать, точно ли streaming не подходит.
Закрытые ресурсы при ленивом потреблении. Если генератор открыл файл и сразу же вернулся наружу, а потребитель «забыл» его дочитать — файл подвиснет до сборки мусора. Лучше явно закрывать (gen.close()) или потреблять до конца.
Что мы получили
yieldприостанавливает функцию, отдаёт значение, и потом возобновляет — это фундамент ленивых вычислений в Python.- Генератор-функция и generator expression — две формы одного механизма.
- Цепочки генераторов — стандартный способ описать ETL-pipeline: O(1) по памяти, легко тестируется по кускам.
itertools— стандартная аптечка:islice,chain,takewhile,accumulate,groupby,batched.- DE-применение №1 — стриминговое чтение больших файлов: 12 ГБ CSV проходит через лаптоп так же легко, как 12 КБ.
В следующем уроке — контекст-менеджеры: они нам уже встречались (with open(...)), но мы посмотрим, как они устроены под капотом, и научимся писать свои.