httpx.AsyncClient: 100 параллельных запросов за время одного
Junior data engineer часто упирается в одну стену: API отвечает 200ms, тебе надо 1000 запросов, sync код займёт 200 секунд. Это не процессорная задача — большая часть времени программа просто ждёт ответа от сервера. Идеальный случай для async/await.
В этом уроке разберём httpx.AsyncClient, концепцию event loop в asyncio, asyncio.gather для параллельных запросов, как ограничивать concurrency через Semaphore, и когда async реально нужен (а когда хватает обычного ThreadPoolExecutor). С benchmark-цифрами.
Async — для чего
Концепция: пока один запрос ждёт ответ от сервера (I/O wait), CPU простаивает. Можно отправить ещё 99 запросов и ждать их одновременно. Это и есть concurrency — не параллелизм (один поток), а кооперативное переключение между задачами.
Sync 100 запросов по 200ms:
[req1 wait 200ms][req2 wait 200ms]...[req100 wait 200ms]
= 20 секунд
Async 100 запросов по 200ms (concurrent):
[req1 wait 200ms]
[req2 wait 200ms]
...всё параллельно...
[req100 wait 200ms]
= 200ms (плюс overhead)
100x ускорение в идеальном случае. На практике — сервер может ограничить, есть OS-лимиты, есть overhead event loop, но 30-50x реалистично.
Event loop за 60 секунд
Asyncio построен на единственном потоке с event loop, который знает все запущенные coroutines и переключается между ними когда они “паркуются” на await.
async def fetch(url):
print(f"Start: {url}")
await asyncio.sleep(1) # симулируем сетевой запрос
print(f"Done: {url}")
return url
import asyncio
async def main():
# Запустим три coroutine параллельно
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3"),
)
print(results)
asyncio.run(main())
# Start: url1
# Start: url2
# Start: url3
# (1 секунда ожидания)
# Done: url1
# Done: url2
# Done: url3
# ['url1', 'url2', 'url3']
Что произошло:
gatherсоздал 3 task-а из coroutine-ов и зарегистрировал в event loop.- Каждый task начал выполняться: print -> дошёл до
await asyncio.sleep(1)-> “запарковался”. - Event loop увидел: все три парканы, ждут таймера. Подождал 1 секунду.
- Все три проснулись одновременно, продолжили выполнение.
Ключевое: await — это место, где coroutine может уступить управление другим. Если в coroutine нет await — она работает до конца как обычная функция.
# Эта coroutine не уступает управление -- блокирует event loop
async def bad():
time.sleep(1) # blocking call! НЕ уступает
return 42
# Эта -- правильная
async def good():
await asyncio.sleep(1) # уступает loop-у
return 42
Главное правило asyncio: никогда не вызывай blocking-функции (time.sleep, requests.get, file I/O без aiofiles) внутри async-кода. Они блокируют весь event loop, и все остальные coroutines стоят. Используй async-аналоги: asyncio.sleep, httpx.AsyncClient, aiofiles.
httpx.AsyncClient
Аналог httpx.Client, но методы — coroutines. Использование через async with:
import asyncio
import httpx
async def fetch_user(client, user_id):
r = await client.get(f"/users/{user_id}")
r.raise_for_status()
return r.json()
async def main():
async with httpx.AsyncClient(
base_url="https://jsonplaceholder.typicode.com",
timeout=30.0,
) as client:
# Параллельно дёрнем 10 user-ов
tasks = [fetch_user(client, i) for i in range(1, 11)]
users = await asyncio.gather(*tasks)
print(f"Got {len(users)} users")
for u in users:
print(f" {u['id']}: {u['name']}")
asyncio.run(main())
# Got 10 users
# 1: Leanne Graham
# 2: Ervin Howell
# ...
Что вижу:
async def— определение coroutine.await client.get(...)— отправляем запрос, паркуемся пока ответа нет, освобождаем event loop для других coroutines.asyncio.gather(*tasks)— запускаем все task-и одновременно, ждём завершения всех, возвращает список результатов в порядке task-ов.async with httpx.AsyncClient()— context manager для ресурсов (тот же connection pool, но aware про async).
Benchmark: sync vs async на 100 запросах
import asyncio
import time
import httpx
import requests
URL = "https://jsonplaceholder.typicode.com/users/1"
N = 100
# 1. Sync через requests без Session
def sync_no_session():
start = time.time()
for _ in range(N):
requests.get(URL)
return time.time() - start
# 2. Sync через requests + Session (HTTP keep-alive)
def sync_with_session():
start = time.time()
with requests.Session() as s:
for _ in range(N):
s.get(URL)
return time.time() - start
# 3. Sync через httpx + Client (то же самое, другая библиотека)
def sync_httpx():
start = time.time()
with httpx.Client() as c:
for _ in range(N):
c.get(URL)
return time.time() - start
# 4. Async через httpx.AsyncClient (concurrent)
async def async_httpx():
start = time.time()
async with httpx.AsyncClient() as c:
await asyncio.gather(*(c.get(URL) for _ in range(N)))
return time.time() - start
# 5. ThreadPool через requests + Session
from concurrent.futures import ThreadPoolExecutor
def threadpool_requests():
start = time.time()
with requests.Session() as s, ThreadPoolExecutor(max_workers=20) as pool:
list(pool.map(lambda _: s.get(URL), range(N)))
return time.time() - start
print(f"Sync no session: {sync_no_session():.2f}s")
print(f"Sync with session: {sync_with_session():.2f}s")
print(f"Sync httpx: {sync_httpx():.2f}s")
print(f"ThreadPool 20w: {threadpool_requests():.2f}s")
print(f"Async httpx: {asyncio.run(async_httpx()):.2f}s")
Типичный output (зависит от сети, сервера, машины):
Sync no session: 9.83s
Sync with session: 4.21s # x2.3 от keep-alive
Sync httpx: 4.30s # ~то же
ThreadPool 20w: 0.58s # x17 от concurrency
Async httpx: 0.42s # x23 от async
Выводы:
- Session/Client даёт 2-3x просто за счёт HTTP keep-alive.
- Concurrency (threads или async) даёт 15-25x дополнительно.
- Async чуть быстрее ThreadPool на одинаковых задачах (нет thread-overhead, но overhead event loop).
- Async сильно масштабируется лучше — 1000 одновременных coroutines дешевле 1000 потоков.
requests и httpx: параллельные запросы в ETL
Limits: ограничение concurrent connections
Проблема: 1000 параллельных запросов отправят 1000 одновременных TCP-соединений. Сервер скажет 429 Too Many Requests или просто закроет коннекшены. И ты сам можешь упереться в OS-лимит на open file descriptors.
Решение — httpx.Limits:
limits = httpx.Limits(
max_connections=20, # макс concurrent connections (всего)
max_keepalive_connections=10, # макс idle connections в pool
keepalive_expiry=5.0, # сколько держать idle connection
)
async with httpx.AsyncClient(limits=limits) as client:
await asyncio.gather(*(client.get(URL) for _ in range(1000)))
Это говорит: “одновременно не больше 20 запросов в полёте, остальные ждут в очереди”. Сервер счастлив, ты не упираешься в лимиты.
Альтернатива — Semaphore:
async def fetch_with_limit(client, url, semaphore):
async with semaphore:
return await client.get(url)
async def main():
semaphore = asyncio.Semaphore(20)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_limit(client, URL, semaphore) for _ in range(1000)]
await asyncio.gather(*tasks)
Семафор более гибкий — можно иметь несколько разных лимитов для разных API.
Когда async реально нужен — а когда нет
Async выигрывает когда:
- Много (10+) параллельных I/O-операций.
- Каждая операция короткая (миллисекунды-секунды), но много.
- Узкое место — сетевая latency, не CPU.
Sync (с Session) хватает когда:
- Запросы последовательные (например, цепочка зависимостей: r1 нужен для r2).
- Малое количество запросов (до 10).
- Простой скрипт где не хочется усложнять.
ThreadPool — золотая середина:
- Не нужно переписывать код в async.
- Хорошо работает до 50-100 потоков (потом thread overhead начинает мешать).
- Сосуществует со sync кодом (не “красит” весь стек async).
CPU-bound задачи (парсинг гигабайтов JSON, числовые расчёты):
- Async не помогает — CPU не умеет multitasking, GIL мешает.
- Используй
multiprocessingили Rust-библиотеки (orjson, polars).
Не переходи на async ради 5-процентного ускорения. Async усложняет код, требует дисциплины (никаких blocking-call!), плохо стыкуется с обычными библиотеками. Используй когда выгода реально 10x+.
Async + Semaphore + retry: production pattern
import asyncio
import httpx
async def fetch_one(client, url, semaphore):
"""Один запрос с ограничением concurrency и retry."""
async with semaphore:
for attempt in range(3):
try:
r = await client.get(url, timeout=10.0)
r.raise_for_status()
return r.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in (502, 503, 504):
await asyncio.sleep(2 ** attempt) # exponential backoff
continue
raise
except (httpx.TimeoutException, httpx.NetworkError):
await asyncio.sleep(2 ** attempt)
raise RuntimeError(f"Failed after 3 retries: {url}")
async def fetch_many(urls, max_concurrent=20):
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient(http2=True) as client:
results = await asyncio.gather(
*(fetch_one(client, u, semaphore) for u in urls),
return_exceptions=True, # не падаем на первой ошибке
)
return results
# Использование
urls = [f"https://jsonplaceholder.typicode.com/users/{i}" for i in range(1, 11)]
results = asyncio.run(fetch_many(urls, max_concurrent=5))
for r in results:
if isinstance(r, Exception):
print(f"Failed: {r}")
else:
print(f"Got user: {r['name']}")
Это рабочий шаблон который копипаститься в большинство async-pipeline-ов: semaphore для лимита, try/except с exponential backoff, return_exceptions=True чтобы одна ошибка не крошила всё.
Попробуй сам
import asyncio
import httpx
import time
async def main():
URL = "https://jsonplaceholder.typicode.com/users"
N = 50
# Async с лимитом 10
start = time.time()
semaphore = asyncio.Semaphore(10)
async def fetch(client):
async with semaphore:
r = await client.get(URL)
return r.status_code
async with httpx.AsyncClient() as client:
tasks = [fetch(client) for _ in range(N)]
results = await asyncio.gather(*tasks)
print(f"Async 50 reqs (limit 10): {time.time() - start:.2f}s")
print(f"All 200: {all(s == 200 for s in results)}")
asyncio.run(main())
Эксперимент: меняй max_concurrent от 1 до 100, замеряй время. Найди sweet spot для конкретного API.
DE-контекст
- Bulk fetch из API: “достать данные о всех 10 000 пользователей” — типичный async-кейс. С semaphore=20 и retry.
- API aggregator: на одной странице — данные из 5 разных API (user info, settings, recent activity, …).
asyncio.gatherпараллелит, сокращает latency. - FastAPI backend: если твой сервис живёт в asyncio event loop — все API-вызовы наружу должны быть тоже async, иначе блокируешь loop.
- Airflow + async: Airflow tasks по умолчанию sync. Можно использовать async внутри task через
asyncio.run(...), но не делай долго-живущий event loop в Airflow — DAG-runner этого не поймёт. - Streaming большого количества webhooks: если на тебя сыпется 100 webhook-ов в секунду — async backend обрабатывает их в одном потоке, не нужен thread per request.
Killer takeaway
asyncio + httpx.AsyncClient — концепция “одновременной работы в одном потоке через cooperative scheduling”. await — место где coroutine может уступить event loop. asyncio.gather — параллельный запуск coroutine-ов. На 100 параллельных запросах async даёт 20-50x ускорение vs sync, и хорошо масштабируется до тысяч одновременных операций (vs ThreadPool, который страдает после 100 потоков). Главные грабли: blocking calls (time.sleep, requests.get) внутри async — блокируют весь loop. Используй httpx.Limits или asyncio.Semaphore чтобы не положить сервер тысячами одновременных соединений. Sync хватает для последовательных запросов, ThreadPool — для умеренной concurrency, async — для большой concurrency и FastAPI-style backend-ов.