Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 30 мин
Начальный
asyncasyncioconcurrencyhttpxevent-loop

httpx.AsyncClient: 100 параллельных запросов за время одного

Junior data engineer часто упирается в одну стену: API отвечает 200ms, тебе надо 1000 запросов, sync код займёт 200 секунд. Это не процессорная задача — большая часть времени программа просто ждёт ответа от сервера. Идеальный случай для async/await.

В этом уроке разберём httpx.AsyncClient, концепцию event loop в asyncio, asyncio.gather для параллельных запросов, как ограничивать concurrency через Semaphore, и когда async реально нужен (а когда хватает обычного ThreadPoolExecutor). С benchmark-цифрами.


Async — для чего

Концепция: пока один запрос ждёт ответ от сервера (I/O wait), CPU простаивает. Можно отправить ещё 99 запросов и ждать их одновременно. Это и есть concurrency — не параллелизм (один поток), а кооперативное переключение между задачами.

Sync 100 запросов по 200ms:
[req1 wait 200ms][req2 wait 200ms]...[req100 wait 200ms]
= 20 секунд

Async 100 запросов по 200ms (concurrent):
[req1 wait 200ms]
[req2 wait 200ms]
...всё параллельно...
[req100 wait 200ms]
= 200ms (плюс overhead)

100x ускорение в идеальном случае. На практике — сервер может ограничить, есть OS-лимиты, есть overhead event loop, но 30-50x реалистично.

Sync vs Async: чем заняты CPU и сеть
Sync: один поток шлёт запрос, ждёт, получает ответ, шлёт следующий
sendCPU 1ms на сериализацию
wait200ms wait на сеть
recvCPU 1ms на парсинг
x100Повторяется 100 раз: 100x202ms = 20.2s
Async: пока один запрос ждёт, отправляем следующий. Event loop переключается между coroutines
send x100100 send почти одновременно
wait (concurrent)Все 100 ждут параллельно
recv x100Ответы приходят, парсятся в порядке прибытия

Event loop за 60 секунд

Asyncio построен на единственном потоке с event loop, который знает все запущенные coroutines и переключается между ними когда они “паркуются” на await.

async def fetch(url):
    print(f"Start: {url}")
    await asyncio.sleep(1)  # симулируем сетевой запрос
    print(f"Done: {url}")
    return url

import asyncio

async def main():
    # Запустим три coroutine параллельно
    results = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3"),
    )
    print(results)

asyncio.run(main())
# Start: url1
# Start: url2
# Start: url3
# (1 секунда ожидания)
# Done: url1
# Done: url2
# Done: url3
# ['url1', 'url2', 'url3']

Что произошло:

  1. gather создал 3 task-а из coroutine-ов и зарегистрировал в event loop.
  2. Каждый task начал выполняться: print -> дошёл до await asyncio.sleep(1) -> “запарковался”.
  3. Event loop увидел: все три парканы, ждут таймера. Подождал 1 секунду.
  4. Все три проснулись одновременно, продолжили выполнение.

Ключевое: await — это место, где coroutine может уступить управление другим. Если в coroutine нет await — она работает до конца как обычная функция.

# Эта coroutine не уступает управление -- блокирует event loop
async def bad():
    time.sleep(1)  # blocking call! НЕ уступает
    return 42

# Эта -- правильная
async def good():
    await asyncio.sleep(1)  # уступает loop-у
    return 42
WARNING

Главное правило asyncio: никогда не вызывай blocking-функции (time.sleep, requests.get, file I/O без aiofiles) внутри async-кода. Они блокируют весь event loop, и все остальные coroutines стоят. Используй async-аналоги: asyncio.sleep, httpx.AsyncClient, aiofiles.


httpx.AsyncClient

Аналог httpx.Client, но методы — coroutines. Использование через async with:

import asyncio
import httpx

async def fetch_user(client, user_id):
    r = await client.get(f"/users/{user_id}")
    r.raise_for_status()
    return r.json()

async def main():
    async with httpx.AsyncClient(
        base_url="https://jsonplaceholder.typicode.com",
        timeout=30.0,
    ) as client:
        # Параллельно дёрнем 10 user-ов
        tasks = [fetch_user(client, i) for i in range(1, 11)]
        users = await asyncio.gather(*tasks)
        print(f"Got {len(users)} users")
        for u in users:
            print(f"  {u['id']}: {u['name']}")

asyncio.run(main())
# Got 10 users
#   1: Leanne Graham
#   2: Ervin Howell
#   ...

Что вижу:

  • async def — определение coroutine.
  • await client.get(...) — отправляем запрос, паркуемся пока ответа нет, освобождаем event loop для других coroutines.
  • asyncio.gather(*tasks) — запускаем все task-и одновременно, ждём завершения всех, возвращает список результатов в порядке task-ов.
  • async with httpx.AsyncClient() — context manager для ресурсов (тот же connection pool, но aware про async).

Benchmark: sync vs async на 100 запросах

import asyncio
import time
import httpx
import requests

URL = "https://jsonplaceholder.typicode.com/users/1"
N = 100

# 1. Sync через requests без Session
def sync_no_session():
    start = time.time()
    for _ in range(N):
        requests.get(URL)
    return time.time() - start

# 2. Sync через requests + Session (HTTP keep-alive)
def sync_with_session():
    start = time.time()
    with requests.Session() as s:
        for _ in range(N):
            s.get(URL)
    return time.time() - start

# 3. Sync через httpx + Client (то же самое, другая библиотека)
def sync_httpx():
    start = time.time()
    with httpx.Client() as c:
        for _ in range(N):
            c.get(URL)
    return time.time() - start

# 4. Async через httpx.AsyncClient (concurrent)
async def async_httpx():
    start = time.time()
    async with httpx.AsyncClient() as c:
        await asyncio.gather(*(c.get(URL) for _ in range(N)))
    return time.time() - start

# 5. ThreadPool через requests + Session
from concurrent.futures import ThreadPoolExecutor

def threadpool_requests():
    start = time.time()
    with requests.Session() as s, ThreadPoolExecutor(max_workers=20) as pool:
        list(pool.map(lambda _: s.get(URL), range(N)))
    return time.time() - start

print(f"Sync no session:    {sync_no_session():.2f}s")
print(f"Sync with session:  {sync_with_session():.2f}s")
print(f"Sync httpx:         {sync_httpx():.2f}s")
print(f"ThreadPool 20w:     {threadpool_requests():.2f}s")
print(f"Async httpx:        {asyncio.run(async_httpx()):.2f}s")

Типичный output (зависит от сети, сервера, машины):

Sync no session:    9.83s
Sync with session:  4.21s     # x2.3 от keep-alive
Sync httpx:         4.30s     # ~то же
ThreadPool 20w:     0.58s     # x17 от concurrency
Async httpx:        0.42s     # x23 от async

Выводы:

  1. Session/Client даёт 2-3x просто за счёт HTTP keep-alive.
  2. Concurrency (threads или async) даёт 15-25x дополнительно.
  3. Async чуть быстрее ThreadPool на одинаковых задачах (нет thread-overhead, но overhead event loop).
  4. Async сильно масштабируется лучше — 1000 одновременных coroutines дешевле 1000 потоков.

requests и httpx: параллельные запросы в ETL

Limits: ограничение concurrent connections

Проблема: 1000 параллельных запросов отправят 1000 одновременных TCP-соединений. Сервер скажет 429 Too Many Requests или просто закроет коннекшены. И ты сам можешь упереться в OS-лимит на open file descriptors.

Решение — httpx.Limits:

limits = httpx.Limits(
    max_connections=20,           # макс concurrent connections (всего)
    max_keepalive_connections=10, # макс idle connections в pool
    keepalive_expiry=5.0,         # сколько держать idle connection
)

async with httpx.AsyncClient(limits=limits) as client:
    await asyncio.gather(*(client.get(URL) for _ in range(1000)))

Это говорит: “одновременно не больше 20 запросов в полёте, остальные ждут в очереди”. Сервер счастлив, ты не упираешься в лимиты.

Альтернатива — Semaphore:

async def fetch_with_limit(client, url, semaphore):
    async with semaphore:
        return await client.get(url)

async def main():
    semaphore = asyncio.Semaphore(20)
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_limit(client, URL, semaphore) for _ in range(1000)]
        await asyncio.gather(*tasks)

Семафор более гибкий — можно иметь несколько разных лимитов для разных API.


Когда async реально нужен — а когда нет

Async выигрывает когда:

  • Много (10+) параллельных I/O-операций.
  • Каждая операция короткая (миллисекунды-секунды), но много.
  • Узкое место — сетевая latency, не CPU.

Sync (с Session) хватает когда:

  • Запросы последовательные (например, цепочка зависимостей: r1 нужен для r2).
  • Малое количество запросов (до 10).
  • Простой скрипт где не хочется усложнять.

ThreadPool — золотая середина:

  • Не нужно переписывать код в async.
  • Хорошо работает до 50-100 потоков (потом thread overhead начинает мешать).
  • Сосуществует со sync кодом (не “красит” весь стек async).

CPU-bound задачи (парсинг гигабайтов JSON, числовые расчёты):

  • Async не помогает — CPU не умеет multitasking, GIL мешает.
  • Используй multiprocessing или Rust-библиотеки (orjson, polars).
TIP

Не переходи на async ради 5-процентного ускорения. Async усложняет код, требует дисциплины (никаких blocking-call!), плохо стыкуется с обычными библиотеками. Используй когда выгода реально 10x+.


Async + Semaphore + retry: production pattern

import asyncio
import httpx

async def fetch_one(client, url, semaphore):
    """Один запрос с ограничением concurrency и retry."""
    async with semaphore:
        for attempt in range(3):
            try:
                r = await client.get(url, timeout=10.0)
                r.raise_for_status()
                return r.json()
            except httpx.HTTPStatusError as e:
                if e.response.status_code in (502, 503, 504):
                    await asyncio.sleep(2 ** attempt)  # exponential backoff
                    continue
                raise
            except (httpx.TimeoutException, httpx.NetworkError):
                await asyncio.sleep(2 ** attempt)
        raise RuntimeError(f"Failed after 3 retries: {url}")


async def fetch_many(urls, max_concurrent=20):
    semaphore = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient(http2=True) as client:
        results = await asyncio.gather(
            *(fetch_one(client, u, semaphore) for u in urls),
            return_exceptions=True,  # не падаем на первой ошибке
        )
    return results


# Использование
urls = [f"https://jsonplaceholder.typicode.com/users/{i}" for i in range(1, 11)]
results = asyncio.run(fetch_many(urls, max_concurrent=5))
for r in results:
    if isinstance(r, Exception):
        print(f"Failed: {r}")
    else:
        print(f"Got user: {r['name']}")

Это рабочий шаблон который копипаститься в большинство async-pipeline-ов: semaphore для лимита, try/except с exponential backoff, return_exceptions=True чтобы одна ошибка не крошила всё.


Попробуй сам

import asyncio
import httpx
import time

async def main():
    URL = "https://jsonplaceholder.typicode.com/users"
    N = 50

    # Async с лимитом 10
    start = time.time()
    semaphore = asyncio.Semaphore(10)

    async def fetch(client):
        async with semaphore:
            r = await client.get(URL)
            return r.status_code

    async with httpx.AsyncClient() as client:
        tasks = [fetch(client) for _ in range(N)]
        results = await asyncio.gather(*tasks)

    print(f"Async 50 reqs (limit 10): {time.time() - start:.2f}s")
    print(f"All 200: {all(s == 200 for s in results)}")

asyncio.run(main())

Эксперимент: меняй max_concurrent от 1 до 100, замеряй время. Найди sweet spot для конкретного API.


DE-контекст

  1. Bulk fetch из API: “достать данные о всех 10 000 пользователей” — типичный async-кейс. С semaphore=20 и retry.
  2. API aggregator: на одной странице — данные из 5 разных API (user info, settings, recent activity, …). asyncio.gather параллелит, сокращает latency.
  3. FastAPI backend: если твой сервис живёт в asyncio event loop — все API-вызовы наружу должны быть тоже async, иначе блокируешь loop.
  4. Airflow + async: Airflow tasks по умолчанию sync. Можно использовать async внутри task через asyncio.run(...), но не делай долго-живущий event loop в Airflow — DAG-runner этого не поймёт.
  5. Streaming большого количества webhooks: если на тебя сыпется 100 webhook-ов в секунду — async backend обрабатывает их в одном потоке, не нужен thread per request.

Killer takeaway

asyncio + httpx.AsyncClient — концепция “одновременной работы в одном потоке через cooperative scheduling”. await — место где coroutine может уступить event loop. asyncio.gather — параллельный запуск coroutine-ов. На 100 параллельных запросах async даёт 20-50x ускорение vs sync, и хорошо масштабируется до тысяч одновременных операций (vs ThreadPool, который страдает после 100 потоков). Главные грабли: blocking calls (time.sleep, requests.get) внутри async — блокируют весь loop. Используй httpx.Limits или asyncio.Semaphore чтобы не положить сервер тысячами одновременных соединений. Sync хватает для последовательных запросов, ThreadPool — для умеренной concurrency, async — для большой concurrency и FastAPI-style backend-ов.

Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что такое await и event loop в asyncio?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5