Learning Platform
Урок 05.01 · 22 мин
Начальный
GeneratorsyielditertoolsStreamingLazy evaluation
Генераторы: стек-фреймы и корутины изнутри Streaming vs batch: фундаментальные концепции

С чего начинается DE-боль

Junior получает свой первый таск: «вот файл events.csv на 12 ГБ, посчитай, сколько строк со страной RU». Junior пишет первое, что приходит в голову:

import csv

with open("events.csv") as f:
    rows = list(csv.DictReader(f))

ru = [r for r in rows if r["country"] == "RU"]
print(len(ru))

Через минуту скрипт убивает ядро

OOM killer’ом
: 12 ГБ текста в памяти превращаются в ~30 ГБ Python-объектов (каждая строка — словарь, каждое значение — str, у каждого свой заголовок и счётчик ссылок). Машина с 16 ГБ RAM такого не переживёт.

Решение этой проблемы — генераторы. Они позволяют обрабатывать данные по одной строке, не держа в памяти весь файл. Это, пожалуй, главный инструмент Data Engineer в Python: 70% production-кода в ETL — это цепочки генераторов.

Что такое генератор

В обычной функции return отдаёт значение и завершает выполнение. Генератор использует ключевое слово

yield
— оно отдаёт значение наружу, но функцию не завершает, а ставит на паузу. Когда у генератора снова попросят следующее значение, выполнение возобновится с того же места.

def counter():
    print("start")
    yield 1
    print("between 1 and 2")
    yield 2
    print("between 2 and 3")
    yield 3
    print("end")

gen = counter()       # ничего не напечатано — функция ещё не запускалась
print(next(gen))      # "start", потом 1
print(next(gen))      # "between 1 and 2", потом 2
print(next(gen))      # "between 2 and 3", потом 3
print(next(gen))      # "end", потом StopIteration

Обратите внимание на две вещи. Первая: counter() не выполняет тело функции. Она возвращает объект-генератор, который только при next() начинает шагать. Вторая: между вызовами next() функция реально стоит на паузе — её локальные переменные и место в коде сохранены.

Список vs генератор: где живёт данные

Список считает всё сразу и держит в памяти. Генератор считает по одному и помнит только текущую позицию.

list[1, 2, 3, 4, 5, ...]
памятьO(n) — все элементыЕсли в списке миллион чисел, миллион int-объектов лежит в куче
итерацияпо уже готовому массиву
плюсыиндексация, len, повторный обход
минусыне подходит для огромных файлов
generatorна паузе после очередного yield
памятьO(1) — только текущий фреймСколько бы значений ни приходило, в памяти всегда лежит один фрейм функции
итерациявычисляется по требованию
плюсыstreaming, ленивость, композиция
минусыодноразовый, нет len, нет индексации

Важное свойство: генератор — одноразовый. После того как он выдал StopIteration, второй раз пройтись по нему нельзя — нужно создать новый. Это часто ловит джунов: «почему мой for row in rows работает только один раз?»

yield против return

Чтобы поставить точку — простая таблица отличий.

returnyield
Завершает функциюПриостанавливает функцию
Один раз за вызовСколько угодно раз
Функция отдаёт значениеФункция отдаёт генератор, из которого можно получать значения
Локальные переменные забываютсяЛокальные переменные сохраняются между yield

Когда в функции встречается хотя бы один yield, Python считает её генератор-функцией — её вызов уже не вычисляет тело, а только создаёт объект-генератор.

Как читать большой CSV

Теперь к нашей DE-задаче. Перепишем подсчёт RU-строк через генератор:

import csv
from collections.abc import Iterator

def read_csv(path: str) -> Iterator[dict[str, str]]:
    with open(path, encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

def only_country(rows: Iterator[dict[str, str]], country: str) -> Iterator[dict[str, str]]:
    for row in rows:
        if row["country"] == country:
            yield row

count = sum(1 for _ in only_country(read_csv("events.csv"), "RU"))
print(count)

Что здесь происходит. read_csv — генератор, который выдаёт по одному словарю на строку. only_country — генератор, который читает входной генератор и пропускает только нужные. Финальный sum(1 for _ in ...) гоняет всю цепочку, увеличивая счётчик на каждый прошедший фильтр элемент.

В памяти в каждый момент времени — одна строка CSV. Файл хоть на 12 ГБ, хоть на терабайт — RAM-потребление одинаковое.

TIP

Iterator[T] из collections.abc — современный type hint для генераторов. Можно писать и Generator[T, None, None], если хочется быть точным, но Iterator[T] — общепринято и читабельнее.

Generator expression

Если генератор-функция нужна одноразово, есть синтаксис покороче — generator expression:

# generator expression — выражение в круглых скобках
ru_rows = (row for row in read_csv("events.csv") if row["country"] == "RU")

# для сравнения — list comprehension в квадратных скобках, грузит всё в память
ru_list = [row for row in read_csv("events.csv") if row["country"] == "RU"]

Разница в скобках — но смысловая пропасть. Первое — ленивый итератор, второе — материализованный список.

Generator expression можно передавать в функции напрямую, без лишних круглых скобок:

total = sum(int(row["amount"]) for row in read_csv("orders.csv"))

Это идиоматично — пишется как scan, читается как scan, работает как scan.

Пайплайны через композицию

Главная сила генераторов — их можно складывать в цепочки, и каждая цепочка не создаёт промежуточных списков.

Pipeline над потоком строк CSV

Каждый уровень — отдельный генератор. Данные текут по одной строке слева направо, ничего не материализуется.

fileevents.csv (12 GB)
read_csvrows : Iterator[dict]Парсит по одной строке
only_countryru : Iterator[dict]Фильтр country == RU
extract_amountamounts : Iterator[int]Берёт поле amount
sumитог: числоФинальный потребитель — собирает поток в одно значение

Каждый блок — отдельная функция-генератор. Все вместе они описывают поток обработки, но пока никто не позвал sum, ни одна строка не прочитана. Это называется

lazy evaluation
, и это ровно та модель, на которой работают Spark, Polars и pandas-streaming.

Дополнительный плюс: каждую функцию-кирпич можно протестировать в изоляции, дав ей небольшой список вместо файла.

itertools — стандартная аптечка

В stdlib есть модуль itertools, в котором лежат самые полезные операции над итераторами. DE-минимум, который надо знать наизусть:

import itertools

# islice — взять кусок [start:stop:step] от любого итератора
first_10 = list(itertools.islice(read_csv("events.csv"), 10))

# chain — склеить несколько итераторов в один
all_rows = itertools.chain(
    read_csv("events_2025.csv"),
    read_csv("events_2026.csv"),
)

# takewhile — брать, пока выполняется условие; потом остановиться
recent = itertools.takewhile(
    lambda r: r["date"] >= "2026-01-01",
    sorted_rows,
)

# accumulate — нарастающий итог (cumulative sum по умолчанию)
running_total = list(itertools.accumulate([10, 20, 30, 40]))  # [10, 30, 60, 100]

# groupby — группирует подряд идущие одинаковые элементы
# ВАЖНО: данные должны быть отсортированы по ключу группировки
for country, group in itertools.groupby(sorted_rows, key=lambda r: r["country"]):
    print(country, sum(1 for _ in group))

Главное про groupby: он группирует только подряд идущие одинаковые элементы. Если данные не отсортированы — получите столько групп, сколько переключений ключа в потоке. Это полезно, если данные уже отсортированы (типичный случай — выгрузка из БД с ORDER BY), и опасно, если нет.

islice особенно полезен в дебаге: можно посмотреть первые 5 строк потока, не запуская всю обработку:

for row in itertools.islice(pipeline, 5):
    print(row)

Передача состояния через send

В yield можно не только отдавать, но и принимать значения. Это редкая в DE возможность, но знать о ней полезно — иногда встретится в legacy-коде.

def echo():
    while True:
        received = yield
        print(f"got: {received}")

gen = echo()
next(gen)            # первый next запускает генератор до первого yield
gen.send("hello")    # печатает "got: hello"
gen.send("world")    # печатает "got: world"

Это превращает генератор в корутину — объект, в который снаружи закидывают данные, а он что-то с ними делает. Современный async/await вырос ровно из этой идеи, но для DE-задач 99% времени достаточно классических «однонаправленных» генераторов с yield value. Закрывать тему не будем; если встретите send в чужом коде — теперь знаете, что это.

Идиоматичные паттерны

Несколько штук, которые в DE-коде вы будете видеть постоянно.

Batched-итератор. Часто нужно вставлять в БД пачками по N штук, а не по одной — bulk-insert на порядки быстрее. В Python 3.12+ для этого есть itertools.batched:

import itertools

def insert_batches(rows, batch_size=1000):
    for batch in itertools.batched(rows, batch_size):
        # batch — кортеж из batch_size элементов (последний может быть короче)
        db.bulk_insert(list(batch))

Yield from. Если вы внутри генератора хотите «выпустить наружу» всё содержимое другого итератора — пишите yield from:

def all_events(paths):
    for path in paths:
        yield from read_csv(path)

Это эквивалентно for row in read_csv(path): yield row, но короче и быстрее.

Закрытие ресурсов. Если в генератор-функции есть with open(...), файл закроется правильно даже если внешний код перестанет потреблять генератор раньше времени. Python вызовет close() на генераторе, что породит GeneratorExit внутри, и with корректно закроет файл. Это безопаснее, чем самому передавать открытый файл.

Граничные случаи и подвохи

Есть три момента, на которых обжигаются почти все.

Генератор одноразовый. После полного обхода он пустой. Если попытаться итерировать второй раз, цикл просто ничего не сделает.

gen = (x*2 for x in range(3))
list(gen)   # [0, 2, 4]
list(gen)   # []  — генератор уже исчерпан

Решение: если хотите проходить дважды — либо материализуйте в список (list(gen)), либо оборачивайте создание генератора в функцию.

Нет len(). У генератора нельзя спросить длину, не пройдя его полностью. len(gen) бросит TypeError. Если нужен размер — это сигнал, что обрабатываете в памяти, и стоит подумать, точно ли streaming не подходит.

Закрытые ресурсы при ленивом потреблении. Если генератор открыл файл и сразу же вернулся наружу, а потребитель «забыл» его дочитать — файл подвиснет до сборки мусора. Лучше явно закрывать (gen.close()) или потреблять до конца.

Что мы получили

  • yield приостанавливает функцию, отдаёт значение, и потом возобновляет — это фундамент ленивых вычислений в Python.
  • Генератор-функция и generator expression — две формы одного механизма.
  • Цепочки генераторов — стандартный способ описать ETL-pipeline: O(1) по памяти, легко тестируется по кускам.
  • itertools — стандартная аптечка: islice, chain, takewhile, accumulate, groupby, batched.
  • DE-применение №1 — стриминговое чтение больших файлов: 12 ГБ CSV проходит через лаптоп так же легко, как 12 КБ.

В следующем уроке — контекст-менеджеры: они нам уже встречались (with open(...)), но мы посмотрим, как они устроены под капотом, и научимся писать свои.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6