Справочник ключевых терминов курса REST API & Data Formats для Junior Data Engineer.
Прикладной протокол клиент-серверной модели поверх TCP (или QUIC в HTTP/3). Stateless: каждый запрос самодостаточен, сервер по умолчанию не помнит предыдущие -- состояние между запросами держат cookies, токены, query-параметры. Текущие версии: HTTP/1.1 (1997, текстовый, до сих пор основной для server-to-server), HTTP/2 (2015, бинарный, multiplexing поверх одного TCP-соединения), HTTP/3 (2022, поверх QUIC/UDP, спасает от head-of-line blocking). Для DE-задач это главный транспорт между ETL-скриптом и сторонним API: REST, webhooks, скачивание CSV, выгрузка в S3. По спецификации регистрозависимы только методы и значения, заголовки case-insensitive.
# HTTP/1.1 диалог как plain text
GET /api/v1/users/42 HTTP/1.1
Host: api.example.com
Accept: application/json
User-Agent: my-etl/1.0
# Ответ:
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 47
{"id":42,"name":"Lev","email":"[email protected]"}
# Проверить, какая версия используется:
curl -v --http2 https://api.example.com/health 2>&1 | grep '< HTTP'HTTP, обёрнутый в TLS-туннель. Кратко: клиент устанавливает TCP-соединение, делает TLS handshake (обмен сертификатами, согласование cipher suite, выработка симметричного ключа), затем общение шифруется. Защищает от пассивного прослушивания, MITM-атак и tampering на промежуточных узлах. Default-порт 443 (vs 80 для plain HTTP). Современные API почти всегда HTTPS-only -- plain HTTP редиректится 301 на https://. Для DE: при работе с corporate proxy / self-signed certs может потребоваться custom CA bundle через `verify=/path/to/ca.pem` в requests/httpx. Stripping HTTPS никогда не делать в production даже для тестов -- токены утекут.
import requests
import ssl
# Базовый запрос -- verify=True по умолчанию
r = requests.get('https://api.example.com/v1/data')
# С corporate CA bundle
r = requests.get('https://internal.corp/api', verify='/etc/ssl/corp-ca.pem')
# Никогда в prod, только локально:
r = requests.get('https://localhost:8443', verify=False)
# Посмотреть TLS-версию и cipher:
# openssl s_client -connect api.example.com:443 -tls1_3Криптографический протокол, обеспечивающий конфиденциальность, целостность и аутентификацию между клиентом и сервером. Версии: TLS 1.2 (2008, до сих пор широко поддерживается), TLS 1.3 (2018, упрощённый handshake -- 1-RTT, обязательное forward secrecy, отброшены устаревшие cipher suites). TLS 1.0/1.1 deprecated с 2020 (PCI DSS, IETF RFC 8996). Handshake в TLS 1.3: ClientHello (cipher suites + key share) -> ServerHello (выбор + сертификат) -> Finished -- и можно слать данные. Сертификат подписан CA из доверенного root store ОС/браузера. Self-signed cert в production -- красный флаг, кроме случаев internal mTLS.
# Проверить, какая версия TLS у API:
openssl s_client -connect api.example.com:443 -tls1_3 < /dev/null
# 'Protocol : TLSv1.3' -- ок
# Принудительно TLS 1.2 в Python:
import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
class TLS12Adapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
ctx = ssl.create_default_context()
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
s = requests.Session()
s.mount('https://', TLS12Adapter())Конкретный подвид URI, указывающий не просто идентификатор ресурса, но и способ его получения (схему). Структура: `scheme://[userinfo@]host[:port]/path[?query][#fragment]`. Пример: `https://api.example.com:443/v1/users/42?fields=name,email#section`. Path case-sensitive на большинстве серверов (кроме Windows-IIS), query -- порядок параметров обычно не важен, но повторение ключа (`?tag=a&tag=b`) обрабатывается по-разному в Express, Flask, FastAPI. Fragment (`#section`) НЕ передаётся на сервер -- только для клиента. Reserved-символы (`/?#&=+`) внутри значений нужно percent-encode (`%20` для пробела, `%2F` для слэша). Длина URL формально не ограничена RFC, но практически серверы режут на 4-8 KB.
from urllib.parse import urlencode, urlparse, parse_qs
# Безопасное составление
params = {'q': 'data engineer', 'limit': 50, 'tags': ['etl', 'sql']}
qs = urlencode(params, doseq=True)
print(qs)
# q=data+engineer&limit=50&tags=etl&tags=sql
# Парсинг
u = urlparse('https://api.example.com:8080/v1/users?id=42#top')
print(u.scheme, u.netloc, u.path, u.query, u.fragment)
# https api.example.com:8080 /v1/users id=42 topНадмножество URL и URN. URI = URL (locator, как достать) + URN (name, просто имя без локации, типа `urn:isbn:0-486-27557-4`). На практике в HTTP-мире разница академическая -- все URI это URL. RFC 3986 определяет общий синтаксис: `scheme:[//authority]path[?query][#fragment]`. Для REST API ресурс идентифицируется именно URI: `/users/42` -- это URI ресурса user-42 относительно базового URL сервиса. Один ресурс может иметь несколько URI (alias, версионирование `/v1/users/42` и `/v2/users/42`), но canonical обычно один -- про него говорит `Link: rel=canonical`.
# URL -- это URI с указанием схемы доступа
https://api.example.com/v1/users/42
# URN -- тоже URI, но без локации
urn:isbn:978-0-13-468599-1
urn:uuid:550e8400-e29b-41d4-a716-446655440000
# Relative URI -- без scheme и authority
/v1/users/42
../files/report.csvГлагол, описывающий действие над ресурсом. Стандартные: GET (читать, safe + idempotent), POST (создать или произвольное действие, не idempotent), PUT (заменить целиком, idempotent), PATCH (частичное обновление, обычно НЕ idempotent если делается через JSON Merge Patch с `+1` операциями), DELETE (удалить, idempotent -- повторный DELETE даёт 404 или 204), HEAD (как GET без body, для проверки существования), OPTIONS (узнать доступные методы и preflight CORS). Регистрозависим: только UPPERCASE по RFC. Safe = не меняет состояние сервера. Idempotent = повторный одинаковый запрос даёт тот же эффект. Эти свойства определяют, можно ли retry при network error без риска duplicate-side-effects.
# Чтение коллекции
GET /api/v1/users
# Создание (сервер генерирует id)
POST /api/v1/users
Content-Type: application/json
{"name":"Lev","email":"[email protected]"}
# Полная замена
PUT /api/v1/users/42
{"name":"Lev N","email":"[email protected]","role":"de"}
# Частичное
PATCH /api/v1/users/42
Content-Type: application/merge-patch+json
{"role":"senior-de"}
# Удаление
DELETE /api/v1/users/42Трёхзначный код в начале response, классифицирующий результат. Классы: 1xx informational (100 Continue, 101 Switching Protocols -- редки в REST), 2xx success (200 OK, 201 Created, 202 Accepted, 204 No Content, 206 Partial Content), 3xx redirection (301 Moved Permanently, 302 Found, 304 Not Modified, 307/308 -- сохраняют method), 4xx client error (400 Bad Request, 401 Unauthorized -- нет/невалидные креды, 403 Forbidden -- креды есть, прав нет, 404 Not Found, 405 Method Not Allowed, 409 Conflict, 410 Gone, 422 Unprocessable Entity, 429 Too Many Requests), 5xx server error (500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout). Реакция клиента: 4xx -- чинить запрос, не retry; 5xx и сетевые -- retry с backoff.
import requests
r = requests.get('https://api.example.com/v1/users/42')
print(r.status_code) # 200
r.raise_for_status() # бросит HTTPError для 4xx/5xx
# Различение классов:
if 200 <= r.status_code < 300:
pass # success
elif r.status_code == 429:
sleep(int(r.headers.get('Retry-After', 1)))
elif 500 <= r.status_code < 600:
pass # retry с backoffМетаданные запроса/ответа в формате `Name: value`, разделённые `\r\n`. Имена case-insensitive (`Content-Type` == `content-type`), значения case-sensitive обычно. Заголовки бывают request-only (Host, Accept, Authorization, User-Agent), response-only (Server, Set-Cookie, Location), общие (Content-Type, Cache-Control, Date). Custom-заголовки исторически с префиксом `X-` (RFC 6648 это deprecated, теперь рекомендуют без префикса), на практике до сих пор `X-Request-ID`, `X-Forwarded-For`. Множественные значения одного заголовка передаются через запятую (`Accept-Language: ru, en;q=0.9`). HTTP/2 ужимает заголовки через HPACK -- экономит трафик при повторяющихся headers.
# Запрос со всеми частыми заголовками
import requests
headers = {
'Authorization': 'Bearer eyJhbGc...',
'Accept': 'application/json',
'Content-Type': 'application/json',
'User-Agent': 'my-etl/1.0 (+https://example.com)',
'X-Request-ID': '550e8400-e29b-41d4-a716-446655440000',
'If-None-Match': '"abc123"',
}
r = requests.post('https://api.example.com/v1/events', headers=headers, json={'event':'click'})
# Заголовки ответа
print(r.headers['Content-Type']) # 'application/json'
print(r.headers.get('X-RateLimit-Remaining'))Содержимое запроса или ответа после headers, отделённое пустой строкой `\r\n\r\n`. Размер задаётся `Content-Length: <bytes>` или приходит чанками с `Transfer-Encoding: chunked`. Формат описывается `Content-Type` (например, `application/json; charset=utf-8`, `text/csv`, `multipart/form-data; boundary=...`). GET по RFC может иметь body, но многие промежуточные узлы (CDN, proxy) его выкинут -- поэтому фильтры в GET передают через query string, а не body. POST/PUT/PATCH -- body основной носитель данных. Стримить большой body можно chunk-by-chunk через `requests.post(url, data=generator())` или `httpx.stream`.
import requests
# JSON body -- requests сам сериализует и проставит Content-Type
r = requests.post(url, json={'name': 'Lev'})
# Raw bytes
with open('file.csv', 'rb') as f:
r = requests.post(url, data=f, headers={'Content-Type': 'text/csv'})
# multipart upload
files = {'data': ('report.csv', open('report.csv', 'rb'), 'text/csv')}
r = requests.post(url, files=files)
# Стрим ответа большого файла
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(chunk_size=64*1024):
out.write(chunk)Сообщение от клиента к серверу. Состоит из request line (`GET /path HTTP/1.1`), headers, пустой строки и опционального body. В HTTP/2 и /3 -- это бинарный фрейм с такими же логическими полями. Request-target -- обычно absolute path с query (`/api/v1/users?id=42`), реже absolute URL (для proxy). Тело может быть в любом формате, описанном Content-Type. С точки зрения DE: в каждом ETL ваши клиенты отправляют тысячи requests в чужие API -- важно правильно ставить headers (Authorization, Accept), обрабатывать ответы (status, body, retry), логировать request_id для трассировки.
# Сырой HTTP/1.1 request:
POST /api/v1/events HTTP/1.1
Host: api.example.com
Authorization: Bearer eyJhbGc...
Content-Type: application/json
Content-Length: 38
User-Agent: my-etl/1.0
{"event":"login","user_id":42}
# То же в Python:
import requests
r = requests.post(
'https://api.example.com/api/v1/events',
json={'event': 'login', 'user_id': 42},
headers={'Authorization': 'Bearer eyJhbGc...'},
)Сообщение от сервера к клиенту в ответ на request. Состоит из status line (`HTTP/1.1 200 OK`), headers, пустой строки и опционального body. Body может отсутствовать (204 No Content, 304 Not Modified, ответ на HEAD), быть фиксированной длины (Content-Length), стриминговый (Transfer-Encoding: chunked) или server-sent events stream (text/event-stream). Заголовки ответа задают тип контента (Content-Type), кэшируемость (Cache-Control, ETag, Last-Modified), редиректы (Location), куки (Set-Cookie), CORS-разрешения (Access-Control-Allow-*).
import requests
r = requests.get('https://api.example.com/v1/users/42')
print(r.status_code) # 200
print(r.reason) # 'OK'
print(r.headers['Content-Type']) # 'application/json'
print(r.headers.get('ETag')) # '"abc123"'
print(r.elapsed.total_seconds()) # 0.234
print(r.url) # final URL после redirects
print(r.history) # [Response 301, Response 302]
print(r.json()) # parsed body
print(r.content[:100]) # raw bytesМеханизм, позволяющий браузеру делать запросы к API на другом origin (схема + хост + порт), если сервер явно это разрешил через response-заголовки `Access-Control-Allow-Origin` и связанные. Ограничение действует ТОЛЬКО в браузере -- server-to-server (Python, Go, curl) на CORS не смотрят, поэтому ETL-скрипты CORS не касается. Browser отправляет `Origin: https://app.example.com` в каждом cross-origin запросе; если в ответе нет соответствующего ACAO -- браузер блокирует доступ к response (хотя сам запрос на сервер ушёл). Для не-простых запросов (custom headers, methods кроме GET/POST/HEAD, JSON body не application/x-www-form-urlencoded) браузер сначала шлёт preflight OPTIONS.
# Сервер должен ответить:
Access-Control-Allow-Origin: https://app.example.com
Access-Control-Allow-Methods: GET, POST, PUT, DELETE
Access-Control-Allow-Headers: Content-Type, Authorization
Access-Control-Allow-Credentials: true
Access-Control-Max-Age: 3600
# В FastAPI:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=['https://app.example.com'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# В Python-скрипте CORS не работает -- это браузерный механизм.Базовое security-правило браузера: страница с одного origin (схема+хост+порт) не может читать данные с другого origin без явного разрешения. Без SOP вкладка с `evil.com` могла бы делать XHR на `bank.com` с вашими cookies и читать ответ. CORS -- это именно набор exceptions из SOP, которые сервер может включить. SOP применяется только к браузерному JS -- `<img src>`, `<script src>`, `<link>` исторически кросс-доменны (поэтому возможны CDN). Origins различаются по любому из трёх компонентов: `https://a.com` и `https://a.com:8443` -- разные origin; `http://a.com` и `https://a.com` -- разные.
// JS на странице https://app.example.com
fetch('https://api.example.com/data') // cross-origin (другой host)
.then(r => r.json())
// Browser: блокирует, если у сервера нет CORS-headers
fetch('https://app.example.com/data') // same-origin
.then(r => r.json()) // ОК всегда
fetch('http://app.example.com/data') // cross-origin (другая схема!)
.then(r => r.json()) // Блок без CORSOPTIONS-запрос, который браузер автоматически делает перед основным cross-origin запросом, если запрос «не-простой»: метод не GET/POST/HEAD, или есть custom headers (типа Authorization, X-API-Key), или Content-Type не входит в `application/x-www-form-urlencoded`/`multipart/form-data`/`text/plain`. Сервер должен ответить нужными `Access-Control-Allow-*` headers -- браузер кэширует ответ согласно `Access-Control-Max-Age` (обычно 5-10 минут) и только потом отправляет реальный запрос. Если preflight провалился (нет ACAO, неверные методы) -- основной запрос НЕ отправится, в DevTools Network видно только OPTIONS с failed CORS error.
# Браузер автоматически (вы не пишете этот код):
OPTIONS /api/v1/users HTTP/1.1
Host: api.example.com
Origin: https://app.example.com
Access-Control-Request-Method: PUT
Access-Control-Request-Headers: authorization, content-type
# Сервер должен ответить:
HTTP/1.1 204 No Content
Access-Control-Allow-Origin: https://app.example.com
Access-Control-Allow-Methods: GET, POST, PUT, DELETE
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Max-Age: 600
# Только потом браузер шлёт реальный PUT.Главный заголовок управления HTTP-кэшем (вытеснил устаревший `Expires` и `Pragma`). Значения: `public` (можно кэшировать всем), `private` (только конечный клиент, не CDN), `no-cache` (можно сохранить, но перед использованием validate с сервером через ETag/If-Modified-Since), `no-store` (не сохранять никуда -- для секретов), `max-age=N` (свежесть N секунд), `s-maxage=N` (max-age для shared caches типа CDN), `must-revalidate` (после истечения обязательно validate), `stale-while-revalidate=N` (можно отдавать stale, пока в фоне обновляется), `immutable` (не валидировать, файл точно не изменился -- для версионированных URL вроде /static/main.abc123.js).
# Долгий кэш для версионированной статики:
Cache-Control: public, max-age=31536000, immutable
# Динамический контент с быстрой инвалидацией:
Cache-Control: private, max-age=60, stale-while-revalidate=300
# API-ответ, который нельзя кэшировать:
Cache-Control: no-store
# Можно сохранить, но обязательно валидировать перед использованием:
Cache-Control: no-cache
# В FastAPI:
from fastapi import Response
@app.get('/data')
def data(response: Response):
response.headers['Cache-Control'] = 'public, max-age=300'
return {'data': '...'}Опаковый идентификатор версии ресурса, который сервер кладёт в ответ. Клиент может использовать его для conditional GET -- `If-None-Match: "<etag>"`. Если ресурс не изменился, сервер ответит `304 Not Modified` без body -- экономит трафик и время парсинга. Strong ETag (`"abc123"`) -- точное совпадение байт-в-байт; weak ETag (`W/"abc123"`) -- семантически эквивалентный (например, тот же контент, но с другими whitespace в JSON). Также используется в optimistic locking: клиент шлёт PUT с `If-Match: "<etag>"`, сервер выдаёт 412 Precondition Failed, если кто-то успел изменить ресурс между чтением и записью.
import requests
# Первый запрос -- получаем ETag
r = requests.get('https://api.example.com/v1/users/42')
etag = r.headers['ETag'] # '"abc123"'
# Повторный с conditional GET
r2 = requests.get(
'https://api.example.com/v1/users/42',
headers={'If-None-Match': etag}
)
if r2.status_code == 304:
print('Не изменилось, используем кэш') # body пустой
else:
print('Обновилось:', r2.json())
# Optimistic locking при апдейте:
requests.put(url, json=new_data, headers={'If-Match': etag})
# 412 Precondition Failed -> кто-то опередил, retry с новым GETЗапрос, который сервер выполняет только если выполнено условие, заданное в заголовке. `If-None-Match: "<etag>"` -- сделать только если версия отличается; `If-Match: "<etag>"` -- только если совпадает (для safe write); `If-Modified-Since: <date>` / `If-Unmodified-Since: <date>` -- то же на основе времени модификации. Сервер отвечает 304 Not Modified (для GET, когда нечего возвращать) или 412 Precondition Failed (для PUT/DELETE, когда условие не выполнилось). Это экономит трафик и предотвращает lost-update проблему. Для DE особенно важно при инкрементальной выгрузке: брать только обновлённое.
# Скачивание только при изменении:
import requests
last_etag = load_from_state('etag')
last_modified = load_from_state('last_modified')
headers = {}
if last_etag:
headers['If-None-Match'] = last_etag
if last_modified:
headers['If-Modified-Since'] = last_modified
r = requests.get('https://api.example.com/v1/products', headers=headers)
if r.status_code == 304:
print('Без изменений, выходим')
return
save_state('etag', r.headers.get('ETag'))
save_state('last_modified', r.headers.get('Last-Modified'))
process(r.json())Маленький кусочек данных, который сервер посылает в ответе через `Set-Cookie`, а клиент возвращает в каждом следующем запросе через `Cookie:` тому же домену. Атрибуты: `Domain` (для какого домена), `Path` (с какого пути), `Expires`/`Max-Age` (TTL), `Secure` (только по HTTPS), `HttpOnly` (недоступна из JS -- защита от XSS), `SameSite` (`Strict`/`Lax`/`None` -- защита от CSRF). Для DE-задач cookies встречаются в session-based API (login -> сохранить cookie в Session -> делать запросы) и в headless web-scraping. `requests.Session()` автоматически хранит и возвращает cookies между запросами.
import requests
s = requests.Session()
# Login -- сервер вернёт Set-Cookie, Session запомнит
s.post('https://example.com/login', data={'user': 'lev', 'pass': '***'})
# Все следующие запросы автоматически шлют сохранённые cookies
r = s.get('https://example.com/dashboard')
print(s.cookies.get_dict())
# {'session_id': 'abc123', 'csrftoken': 'xyz'}
# Ручное добавление
cookies = {'sessionid': 'abc'}
requests.get(url, cookies=cookies)Распределённая сеть кэширующих серверов в разных географиях (PoP -- points of presence), забирающая ответ из origin-сервера и раздающая клиентам с ближайшего узла. Уменьшает latency, разгружает origin, спасает от DDoS. Главные провайдеры: CloudFront, Cloudflare, Fastly, Akamai. Для DE важно: (1) если CDN кэширует API -- добавляйте `Cache-Control: private` или `no-store` для пользовательских данных; (2) при инвалидации (purge) учитывайте, что в разных PoP кэш может прожить дольше до propagation; (3) `Vary: Accept-Encoding, Authorization` важно -- иначе CDN отдаст один кэшированный ответ всем юзерам с разными токенами.
# Правильные заголовки для API через CDN:
Cache-Control: public, max-age=300
Vary: Accept-Encoding, Authorization, Accept
# Чтобы CDN НЕ кэшировал персональные данные:
Cache-Control: private, no-store
# Проверить, отвечает ли CDN или origin:
curl -v https://api.example.com/data 2>&1 | grep -i 'x-cache\|cf-cache\|age:'
# X-Cache: HIT (или MISS)
# Age: 142 (сколько секунд лежит в кэше)
# CF-Cache-Status: HITТекстовый формат сериализации данных, основанный на подмножестве синтаксиса JavaScript (RFC 8259). Типы: object (`{...}`), array (`[...]`), string (UTF-8, в двойных кавычках), number (без NaN/Infinity по спеке), boolean (`true`/`false`), null. Главный формат для REST API. Не поддерживает комментарии, trailing comma, неэкранированные символы управления. `Content-Type: application/json; charset=utf-8`. Альтернативы вроде JSON5 / Hjson решают эти ограничения, но не приняты в API. Размер избыточный по сравнению с бинарными форматами (Avro/Protobuf), но human-readable, легко дебажится. Парсится во все популярные языки нативно -- `json.loads`/`json.dumps` в Python.
import json
from datetime import datetime, date
from decimal import Decimal
data = {
'id': 42,
'name': 'Lev',
'tags': ['de', 'python'],
'active': True,
'salary': None,
}
raw = json.dumps(data, ensure_ascii=False, indent=2)
parsed = json.loads(raw)
# Кастомный сериализатор для datetime / Decimal
def default(o):
if isinstance(o, (datetime, date)):
return o.isoformat()
if isinstance(o, Decimal):
return str(o)
raise TypeError(f'{type(o)} not serializable')
json.dumps({'when': datetime.utcnow(), 'amount': Decimal('1.05')}, default=default)Формат, где каждая строка -- отдельный валидный JSON-объект, разделённый `\n`. Часто называют NDJSON (Newline Delimited JSON) -- практически синонимы. Удобен для стриминга больших датасетов: можно читать и обрабатывать построчно, не загружая весь файл в память. Часто используется в логах (Filebeat, Loki), Kafka payloads, BigQuery / Snowflake bulk-load, OpenAI API streaming responses. `Content-Type: application/x-ndjson` или `application/jsonl`. Главное правило: одна строка = один валидный JSON; никаких pretty-print'ов с переносом ключей на разные строки.
# users.jsonl:
# {"id":1,"name":"Lev"}
# {"id":2,"name":"Anya"}
# {"id":3,"name":"Max"}
import json
# Запись построчно (memory O(1))
with open('users.jsonl', 'w') as f:
for user in stream_users():
f.write(json.dumps(user, ensure_ascii=False) + '\n')
# Чтение построчно
with open('users.jsonl') as f:
for line in f:
user = json.loads(line)
process(user)
# Стриминг по HTTP
import requests
with requests.get(url, stream=True) as r:
for line in r.iter_lines():
if line:
yield json.loads(line)Альтернативное название JSONL, фактически тот же формат: одна строка = один JSON-объект, разделитель `\n`. NDJSON чуть строже -- требует именно `\n` (LF), без `\r\n` (CRLF) на Windows; JSONL более расслаблен. На практике в API/data tools названия взаимозаменяемы. MIME: `application/x-ndjson`. Важно для DE: streaming bulk-API (Elasticsearch _bulk endpoint требует NDJSON, BigQuery streaming insert принимает NDJSON), и при стриминге response через `iter_lines()` в requests/httpx -- это идеальный transport для chunked данных без full-buffer в памяти.
# Elasticsearch _bulk endpoint требует именно NDJSON:
# {"index": {"_index": "users"}}
# {"id":1, "name":"Lev"}
# {"index": {"_index": "users"}}
# {"id":2, "name":"Anya"}
import json
import requests
body = ''
for user in users:
body += json.dumps({'index': {'_index': 'users'}}) + '\n'
body += json.dumps(user) + '\n'
requests.post(
'http://es:9200/_bulk',
data=body,
headers={'Content-Type': 'application/x-ndjson'}
)Текстовый табличный формат: первая строка -- header, дальше значения через запятую (или точку с запятой в DE-европейской локали). Описан в RFC 4180, но на практике каждый продукт пишет свой dialect: разделитель (`,` `;` `\t`), кавычки (` "..."`), escape (`""` или `\"`), encoding (UTF-8, UTF-8 BOM, cp1251, cp1252), line terminator (`\n` vs `\r\n`). Особо коварен BOM (Byte Order Mark `\ufeff`) в начале -- Excel его пишет, и `pandas.read_csv` без `encoding='utf-8-sig'` положит его в имя первой колонки. Многострочные значения возможны -- строка может физически занимать несколько lines, если внутри quoted-поля есть `\n`.
import csv, pandas as pd
# Безопасное чтение
with open('data.csv', encoding='utf-8-sig', newline='') as f:
reader = csv.DictReader(f, dialect='excel')
for row in reader:
print(row)
# pandas -- всегда явно указывайте encoding и dtype
df = pd.read_csv(
'data.csv',
encoding='utf-8-sig',
sep=',',
quotechar='"',
dtype={'user_id': str, 'amount': float},
parse_dates=['created_at'],
)
# Запись с правильным quoting
df.to_csv('out.csv', index=False, quoting=csv.QUOTE_NONNUMERIC, encoding='utf-8')Подвид CSV с разделителем-табом (`\t`) вместо запятой. Преимущество: значения почти никогда не содержат `\t` (в отличие от запятой), поэтому не нужны кавычки и escape -- формат проще. Стандарт IANA -- `text/tab-separated-values`. Распространён в bioinformatics (BLAST output), в выгрузках из старых СУБД, в логах. В Python читается тем же csv-модулем с `dialect='excel-tab'` или `delimiter='\t'`. Опасности: если в данных всё-таки есть `\t` (редко, но бывает в текстовых полях), некоторые tools тихо поломаются -- лучше предварительно сделать `re.search(r'\t', value)`.
import csv, pandas as pd
# Чтение
with open('data.tsv', encoding='utf-8') as f:
reader = csv.reader(f, delimiter='\t')
headers = next(reader)
for row in reader:
print(dict(zip(headers, row)))
# pandas
df = pd.read_csv('data.tsv', sep='\t', encoding='utf-8')
# Запись TSV
df.to_csv('out.tsv', sep='\t', index=False, encoding='utf-8')Текстовый формат сериализации, надмножество JSON. Поддерживает комментарии (`#`), якоря (`&name`) и алиасы (`*name`), multi-line строки, литералы (`|` сохраняет переносы, `>` сворачивает). Синтаксис чувствителен к отступам -- только пробелы (НЕ табы). Используется как формат конфигов: Docker Compose, Kubernetes manifests, GitHub Actions, Ansible playbooks. Главный capkan -- `yaml.load` без `Loader` исполняет произвольный код через `!!python/object/apply` теги (RCE). ВСЕГДА используйте `yaml.safe_load`. Boolean trap: `yes`/`no`/`on`/`off`/`y`/`n` парсятся как bool в YAML 1.1 (PyYAML default), но не в YAML 1.2 (ruamel.yaml). Версия Norway: `country: NO` -> False.
import yaml
# ОПАСНО (RCE):
# data = yaml.load(text) # <- НИКОГДА
# Безопасно:
data = yaml.safe_load(text)
# Boolean trap демо:
text = '''
countries:
- NO # <- Norway? Нет, это False!
- YES # <- True
- DE
'''
print(yaml.safe_load(text))
# {'countries': [False, True, 'DE']}
# Обход: всегда заворачивать в кавычки
text_safe = '''
countries:
- 'NO'
- 'YES'
- 'DE'
'''Текстовый формат разметки, надмножество HTML. Структура: вложенные теги с атрибутами и текстовым содержимым. До массового JSON был основным форматом веб-API (SOAP). Сейчас встречается в legacy-API, банковских интеграциях, RSS-feed, sitemap.xml, SVG, OOXML (docx/xlsx). В DE -- выгрузки от госорганов, EDI. Опасности парсинга: XXE (XML External Entity) injection, billion-laughs DoS, XML bomb. Стандартная библиотека `xml.etree.ElementTree` уязвима для некоторых атак -- для untrusted input используйте `defusedxml`. Парсинг намного дороже JSON по CPU. Namespaces (`xmlns:foo`) усложняют XPath-запросы.
# БЕЗОПАСНО -- defusedxml для untrusted input:
from defusedxml import ElementTree as ET
root = ET.fromstring(xml_text)
for user in root.findall('.//user'):
print(user.get('id'), user.find('name').text)
# Namespaces (часто в SOAP):
ns = {'soap': 'http://schemas.xmlsoap.org/soap/envelope/'}
body = root.find('soap:Body', ns)
# Запись
import xml.etree.ElementTree as XET
root = XET.Element('users')
u = XET.SubElement(root, 'user', id='1')
XET.SubElement(u, 'name').text = 'Lev'
XET.indent(root)
XET.tostring(root, encoding='utf-8', xml_declaration=True)Бинарный формат сериализации, schema-less как JSON, но компактнее (примерно в 1.5-2 раза) и быстрее парсится. Типы: nil, bool, int (vary-length), float, str, bin (raw bytes -- нет в JSON), array, map, ext (custom). Используется как replacement JSON в Redis, fluentd, ROS, Kafka payloads, IoT/edge. Не human-readable -- для дебага придётся декодировать. В Python: `msgpack` (C-extension, fast). Поддерживает `use_bin_type=True` (рекомендуется) для разделения str и bytes -- по умолчанию для совместимости со старыми версиями он же merges их. MIME: `application/msgpack` или `application/x-msgpack`.
import msgpack
data = {'id': 42, 'name': 'Lev', 'tags': ['de', 'python']}
# Сериализация
packed = msgpack.packb(data, use_bin_type=True)
print(len(packed)) # 38 байт
# Для сравнения JSON:
import json
print(len(json.dumps(data).encode())) # 56 байт
# Десериализация
unpacked = msgpack.unpackb(packed, raw=False)
print(unpacked)
# Стриминг (несколько записей в одном файле/стриме)
unpacker = msgpack.Unpacker(raw=False)
unpacker.feed(packed_chunk)
for obj in unpacker:
process(obj)Бинарный формат сериализации со строгой схемой. Schema хранится отдельно (JSON), данные -- компактные бинари. Главная фича -- schema evolution: новая версия схемы может читать данные старой и наоборот, если соблюсти rules (default values для новых полей, не удалять без default, etc.). Это делает Avro де-факто стандартом для Kafka payloads (через Confluent Schema Registry, который выдаёт schema_id за байтовый префикс). Также используется в Hadoop/Spark/Hive. Файл `.avro` -- schema + блоки данных + опциональная компрессия (snappy, deflate). Альтернатива -- Parquet для analytical workloads (column-oriented), Avro лучше для row-oriented streaming.
import fastavro
schema = {
'type': 'record',
'name': 'User',
'fields': [
{'name': 'id', 'type': 'long'},
{'name': 'name', 'type': 'string'},
{'name': 'email', 'type': ['null', 'string'], 'default': None},
],
}
records = [
{'id': 1, 'name': 'Lev', 'email': '[email protected]'},
{'id': 2, 'name': 'Anya', 'email': None},
]
with open('users.avro', 'wb') as f:
fastavro.writer(f, schema, records, codec='snappy')
with open('users.avro', 'rb') as f:
for r in fastavro.reader(f):
print(r)Бинарный column-oriented формат для analytical workloads. Хранит данные по столбцам (а не по строкам, как CSV/Avro), что даёт огромный выигрыш при agg-запросах (`SELECT SUM(amount) FROM ...`) -- читается только нужная колонка с диска. Поддерживает schema evolution, nested types (через Dremel encoding), per-column compression (snappy default, gzip, zstd, brotli) и encoding (dictionary, RLE, delta). Файл организован в row groups (обычно 128MB) -> column chunks -> pages. Footer хранит metadata (schema, statistics: min/max per column для predicate pushdown). De-facto стандарт data lake (S3 + Athena/Presto/Spark/DuckDB). Главный конкурент -- ORC.
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# pandas -> Parquet
df = pd.DataFrame({'id': [1,2,3], 'name': ['a','b','c'], 'amount': [10.5, 20.0, 30.7]})
df.to_parquet('data.parquet', engine='pyarrow', compression='snappy')
# pyarrow напрямую (точнее по dtypes)
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, 'data.parquet', compression='zstd')
# Чтение только нужных колонок (column pruning)
table = pq.read_table('data.parquet', columns=['id', 'amount'])
# Партиционированная запись
pq.write_to_dataset(table, root_path='lake/users',
partition_cols=['country', 'year'])
# DuckDB query поверх Parquet
import duckdb
duckdb.sql("SELECT SUM(amount) FROM 'lake/users/**/*.parquet'").show()Альтернатива Parquet, тоже column-oriented, родом из Hortonworks/Hive. Похожая структура: stripes (как row groups в Parquet) -> column streams -> data + index. Лучше Parquet интегрирован с Hive (legacy-кластерах часто видно `STORED AS ORC`), исторически был быстрее на bloom-filter pushdown. На практике для нового data lake обычно выбирают Parquet -- он шире поддержан (Spark, DuckDB, Athena, Snowflake, ClickHouse). ORC встречается в наследии Hortonworks/Cloudera. В Python через `pyorc` или `pyarrow.orc` (только чтение в pyarrow до недавних версий).
import pyarrow as pa
import pyarrow.orc as orc
import pandas as pd
df = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})
table = pa.Table.from_pandas(df, preserve_index=False)
# Запись ORC
with open('data.orc', 'wb') as f:
orc.write_table(table, f, compression='snappy')
# Чтение
with open('data.orc', 'rb') as f:
table = orc.read_table(f)
df = table.to_pandas()Бинарный формат сериализации от Google со строгой schema, описанной в `.proto` файлах. Компилятор (`protoc`) генерирует код для целевого языка (Python, Go, Java, etc.). Очень компактен -- числа varint-encoded, поля идентифицируются tag-номерами (а не именами как в JSON). Schema evolution через резервирование номеров и `optional` поля. De facto на wire-уровне в gRPC, в google APIs (Maps, BigQuery internals). Версии: proto2 (legacy, обязательность через `required`/`optional`), proto3 (актуальная, все поля implicitly optional с default `0`/`""`/`false`). Не human-readable -- для отладки `protoc --decode` или text-format `MessageToString()`.
// user.proto
syntax = "proto3";
message User {
int64 id = 1;
string name = 2;
optional string email = 3;
repeated string tags = 4;
}
# Компиляция:
# protoc --python_out=. user.proto
import user_pb2
u = user_pb2.User(id=42, name='Lev', email='[email protected]', tags=['de','python'])
blob = u.SerializeToString() # bytes
print(len(blob)) # 23 байт vs JSON ~50
u2 = user_pb2.User()
u2.ParseFromString(blob)
print(u2.name) # 'Lev'Архитектурный стиль (НЕ протокол, НЕ стандарт), описанный Roy Fielding в диссертации 2000 года. Шесть constraints: client-server, stateless, cacheable, uniform interface, layered system, code on demand (опционально). Главное на практике: ресурсы идентифицируются URI (`/users/42`), действия выражаются HTTP-методами (GET/POST/PUT/PATCH/DELETE), состояние не хранится сервером между запросами, ответы кэшируемы. Большинство «REST API» в индустрии на самом деле RESTish -- не выполняют HATEOAS (Level 3 по Richardson Maturity Model). Для DE: важно понимать различие safe/idempotent методов для retry-логики, и почему PUT идемпотентен, а POST -- нет.
# RESTful URI и методы:
GET /api/v1/users # список
GET /api/v1/users/42 # один
POST /api/v1/users # создать (server-generated id)
PUT /api/v1/users/42 # заменить целиком
PATCH /api/v1/users/42 # частично
DELETE /api/v1/users/42 # удалить
# Под-ресурсы:
GET /api/v1/users/42/orders
POST /api/v1/users/42/orders
# НЕ-RESTful (но встречается):
POST /api/v1/getUserById
POST /api/v1/users/42/deleteЛюбая сущность, идентифицируемая URI: пользователь, заказ, файл, операция. Ресурс может быть concrete (`/users/42`) или коллекцией (`/users`). REST мыслит существительными, не глаголами -- `/users/42/activate` хуже, чем `POST /users/42/activations`. Под-ресурсы выражают композицию: `/users/42/orders` -- заказы юзера 42. Один и тот же ресурс может иметь несколько representation (см. Representation): `/users/42` отдаст JSON по умолчанию, но при `Accept: text/csv` может вернуть CSV. Изменение URI в новой версии -- breaking change (см. Versioning).
# Ресурс -- это абстракция; URI -- его идентификатор.
# Один ресурс -- несколько URI (alias)
/users/42
/users/by-email/[email protected] # тот же user, разный URI
# Канонический URI указывается через Link:
Link: <https://api.example.com/v1/users/42>; rel="canonical"
# Sub-resources для отношений:
/users/42 # сам user
/users/42/orders # коллекция orders этого user
/users/42/orders/108 # конкретный order
/users/42/avatar # singleton sub-resourceКонкретное представление ресурса в данный момент в данном формате. Один ресурс `/users/42` может иметь представления: JSON (`Accept: application/json`), XML (`Accept: application/xml`), CSV для bulk export (`Accept: text/csv`). Сервер выбирает формат через content negotiation на основе `Accept` header. Версия API тоже может быть variant -- `Accept: application/vnd.example.v2+json`. Representation несёт в себе state ресурса (отсюда «Representational State Transfer»). При PUT клиент отправляет новую representation для замены текущей.
# Content negotiation:
GET /api/v1/users/42
Accept: application/json
# vs
GET /api/v1/users/42
Accept: text/csv
# vs version через Accept (vendor MIME):
GET /api/users/42
Accept: application/vnd.example.v2+json
# В Python:
import requests
r = requests.get(url, headers={'Accept': 'text/csv'})
print(r.headers['Content-Type']) # 'text/csv'Свойство операции: повторный одинаковый запрос даёт тот же эффект на сервере, что и одиночный. Идемпотентны: GET, HEAD, OPTIONS, PUT, DELETE. НЕ идемпотентен: POST (каждый POST /orders создаёт новый order). Важно для retry: если сетевой error на идемпотентном запросе -- можно безопасно retry без побочных эффектов. Для POST'ов делают idempotency через `Idempotency-Key` header (Stripe, GitHub, Square): клиент генерирует UUID, шлёт в header; сервер хранит мапу key->result, при повторном запросе возвращает закэшированный ответ вместо создания дубликата. TTL key обычно 24 часа.
import requests
import uuid
# Идемпотентный POST через Idempotency-Key:
order_id = str(uuid.uuid4())
for attempt in range(3):
try:
r = requests.post(
'https://api.example.com/v1/payments',
headers={'Idempotency-Key': order_id},
json={'amount': 100, 'currency': 'USD'},
timeout=10,
)
r.raise_for_status()
break
except (requests.Timeout, requests.ConnectionError):
# Безопасно retry -- сервер вернёт тот же payment_id
continueСвойство HTTP-метода: его выполнение не должно изменять состояние сервера. Safe = read-only от точки зрения клиента (логирование/аналитика серверу разрешены). Safe-методы: GET, HEAD, OPTIONS, TRACE. НЕ safe: POST, PUT, PATCH, DELETE. Safety важна, потому что: (1) поисковики, prefetchers, link checkers свободно ходят по safe URL -- если за GET спрятано удаление (`GET /users/42/delete`), Googlebot вам всё снесёт; (2) safe можно кэшировать без риска stale-side-effects; (3) логично чередовать safe-методы в health-check'ах. Каждый safe также idempotent (но не наоборот: PUT idempotent, но не safe).
# Safe (никаких изменений):
GET /api/v1/users/42
HEAD /api/v1/users/42 # как GET, но без body
OPTIONS /api/v1/users/42 # доступные методы
# НЕ safe -- изменяют состояние:
POST /api/v1/users # создаёт нового
PUT /api/v1/users/42 # заменяет
DELETE /api/v1/users/42 # удаляет
# АНТИПАТТЕРН -- не safe под видом safe:
GET /api/v1/users/42/delete # <- Googlebot снесёт всёПринцип REST: каждый запрос самодостаточен -- содержит всё, что нужно серверу для его обработки (auth, params, body). Сервер НЕ хранит client session между запросами. Это позволяет: (1) горизонтально масштабироваться (любой инстанс обработает любой запрос); (2) кэшировать ответы; (3) легко балансировать нагрузку (без sticky sessions). На практике state часто хранят: (a) на стороне клиента в JWT (stateless auth) или cookie с зашифрованной сессией; (b) во внешнем shared store (Redis, Memcached) -- это не нарушает REST, потому что server processes остаются stateless. Stateful WebSocket-соединения REST не считаются.
# Stateless: каждый запрос несёт auth
GET /api/v1/users/42
Authorization: Bearer eyJhbGc...
# Сервер обработал, забыл -- следующий запрос:
GET /api/v1/users/42
Authorization: Bearer eyJhbGc... # <- снова, обязательно
# Антипаттерн (stateful):
POST /api/v1/login -> сервер запомнил сессию для этого socket
GET /api/v1/users/42 -> сервер проверяет socket session
# Не масштабируется без sticky LBПринцип Fielding-REST уровня 3 (Richardson Maturity Model): ответы API содержат ссылки на следующие возможные действия, чтобы клиент не хардкодил URL. Формат: HAL (`_links`), JSON:API (`links`), Siren, Collection+JSON. На практике мало кто соблюдает -- большинство REST API называются «REST», но фактически Level 2 (resources + methods, без hypermedia). Для DE важно знать: GitHub API -- частично HATEOAS (`Link: <next_url>; rel="next"` для пагинации), Stripe -- нет, AWS -- нет. HATEOAS облегчает API evolution (URL меняются, клиент следует ссылкам), но усложняет codegen и client-side caching.
// HAL example:
{
"id": 42,
"name": "Lev",
"_links": {
"self": {"href": "/users/42"},
"orders": {"href": "/users/42/orders"},
"avatar": {"href": "/users/42/avatar"}
}
}
// GitHub pagination через Link header -- это тоже HATEOAS:
Link: <https://api.github.com/repos?page=2>; rel="next",
<https://api.github.com/repos?page=10>; rel="last"Стратегии управления breaking changes в API. Главные подходы: (1) URL path -- `/v1/users`, `/v2/users` (Twitter, GitHub) -- самый простой, видно сразу; (2) Custom header -- `X-API-Version: 2` или Accept-MIME -- `Accept: application/vnd.example.v2+json` (GitHub альтернатива); (3) Query param -- `/users?version=2` (используется реже); (4) Date-based -- `?api-version=2023-10-30` (Stripe, Azure). Breaking changes требуют новой версии: удаление полей, изменение типа, обязательный новый параметр. Non-breaking (добавление optional полей, новые endpoints) -- без bump. Поддерживайте старую версию минимум 6-12 месяцев и пишите migration guide.
# URL path versioning (наиболее частый):
GET /api/v1/users/42
GET /api/v2/users/42
# Header versioning:
GET /api/users/42
Accept: application/vnd.example.v2+json
# Date versioning (Stripe-style):
GET /v1/charges
Stripe-Version: 2024-04-10
# В Python httpx -- можно зашить в base_url:
import httpx
client = httpx.Client(
base_url='https://api.example.com/v2',
headers={'Accept': 'application/json'}
)Самая популярная HTTP-библиотека Python (Kenneth Reitz, 2011). Синхронная, без HTTP/2, без async. Простой API: `requests.get(url)`, `requests.post(url, json=...)`. Под капотом -- urllib3. Поддерживает session pooling (через `Session()`), file uploads, streaming, redirects, cookies, proxies. Текущая версия 2.34 (май 2026). Для DE -- рабочая лошадка для простых одноразовых интеграций. Лимитации: нет HTTP/2, нет async (блокирует event loop), нет встроенных retry (нужен `urllib3.util.Retry`). Альтернатива при этих требованиях -- httpx.
import requests
# Простой GET
r = requests.get('https://api.example.com/v1/users/42', timeout=10)
r.raise_for_status()
data = r.json()
# POST с JSON
r = requests.post(
'https://api.example.com/v1/users',
json={'name': 'Lev', 'email': '[email protected]'},
headers={'Authorization': 'Bearer eyJhbGc...'},
timeout=10,
)
# Streaming большого файла
with requests.get(url, stream=True, timeout=30) as r:
for chunk in r.iter_content(64*1024):
f.write(chunk)Современная HTTP-библиотека Python с API почти как у requests, но с поддержкой HTTP/2, sync + async, type hints, transports plugin-architecture. Текущая версия 0.28 (май 2026). Главные фичи: `httpx.Client()` (sync) и `httpx.AsyncClient()` (async) -- один и тот же API; HTTP/2 через `httpx.Client(http2=True)` (требует extras `pip install httpx[http2]`); precise timeout per phase (connect/read/write/pool); встроенные mounts/transports (можно отдельно настроить retry / mock для разных URL). Используется FastAPI internally, рекомендован для нового кода.
import httpx
import asyncio
# Sync API -- как requests
with httpx.Client(timeout=10.0) as client:
r = client.get('https://api.example.com/v1/users/42')
r.raise_for_status()
data = r.json()
# Async -- параллельные запросы
async def fetch_users(ids):
async with httpx.AsyncClient(http2=True, timeout=10.0) as client:
tasks = [client.get(f'/users/{i}') for i in ids]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
# Запуск:
# users = asyncio.run(fetch_users([1,2,3,4,5]))Объект, хранящий состояние между запросами: cookies, default-headers, auth, connection pool. В requests -- `requests.Session()`, в httpx -- `httpx.Client()`/`httpx.AsyncClient()`. Главное преимущество -- connection pooling: TCP-соединение и TLS handshake переиспользуются для нескольких запросов к тому же хосту, ускорение в 2-10 раз. Также удобно ставить общие headers (Authorization, User-Agent) один раз. Анти-паттерн -- создавать `requests.get(...)` в цикле: каждый раз новое соединение, медленно. Session нужно явно закрывать (`s.close()`) или использовать `with` -- иначе ресурсы текут.
import requests
# Антипаттерн: новое соединение на каждый запрос
for user_id in user_ids:
requests.get(f'https://api.example.com/users/{user_id}')
# Хорошо: переиспользование через Session
with requests.Session() as s:
s.headers.update({'Authorization': 'Bearer eyJhbGc...'})
s.headers.update({'User-Agent': 'my-etl/1.0'})
for user_id in user_ids:
r = s.get(f'https://api.example.com/users/{user_id}')
process(r.json())В requests -- объект, отвечающий за то, КАК выполняется HTTP-запрос. Default -- `HTTPAdapter` поверх urllib3. Можно mount свой adapter для конкретного prefix URL: например, разные retry-policies для разных хостов, custom TLS-context, mock-adapter для тестов. В httpx эту роль выполняет `Transport`. Типичные кастомизации: настроить `pool_connections`, `pool_maxsize` для high-concurrency; добавить `Retry` с backoff_factor; mount `HTTPSAdapter` с принудительным TLS 1.3. Полезно когда работаете с кучей одновременных хостов и нужно тонко крутить connection pool.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry = Retry(
total=3,
backoff_factor=0.5, # 0.5, 1.0, 2.0 сек
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=['HEAD', 'GET', 'PUT', 'DELETE', 'OPTIONS'],
)
adapter = HTTPAdapter(
max_retries=retry,
pool_connections=20,
pool_maxsize=20,
)
s = requests.Session()
s.mount('https://api.example.com', adapter)
s.mount('http://api.example.com', adapter)
r = s.get('https://api.example.com/v1/users/42')Максимальное время ожидания, после которого запрос прерывается с TimeoutException. В requests `timeout=N` (сек) -- это read timeout (между bytes), без него запрос может висеть вечно. Лучше `timeout=(connect, read)` -- `(3.05, 27)` -- отдельно на установку соединения и на чтение каждой порции. ВАЖНО: `timeout` в requests это НЕ total timeout -- это per-IO операция; на фрагментированном ответе общее время может быть гораздо больше. В httpx -- сложнее: `httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=2.0)` либо просто `timeout=10.0`. Без timeout -- антипаттерн в production: socket-висяки забивают connection pool, ETL встаёт колом.
import requests
import httpx
# requests:
r = requests.get(url, timeout=10) # 10 сек на read
r = requests.get(url, timeout=(3.05, 27)) # connect=3.05, read=27
# Никогда так:
r = requests.get(url) # <- может висеть вечно
# httpx -- гранулярнее:
timeout = httpx.Timeout(
connect=5.0, # установка TCP
read=10.0, # чтение ответа
write=5.0, # запись body
pool=2.0, # ожидание свободного слота в pool
)
with httpx.Client(timeout=timeout) as c:
c.get(url)Повторное выполнение запроса после неудачи. Безопасно retry'ить только idempotent методы (GET, HEAD, PUT, DELETE) или POST с `Idempotency-Key`. Условия retry: сетевые ошибки (ConnectionError, Timeout), 5xx (особенно 502/503/504), 429 Too Many Requests (с учётом `Retry-After`). НЕ retry'ить 4xx (кроме 408, 425, 429) -- это ошибки клиента, повторное выполнение не поможет. Стратегия: exponential backoff с jitter -- `delay = base * 2^attempt + random()`. Ограничение по числу попыток (3-5) и общему времени (`max_elapsed_time`). В Python: `urllib3.util.Retry`, `tenacity`, `backoff` (libraries).
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
import requests
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=30, jitter=2),
retry=retry_if_exception_type((requests.ConnectionError, requests.Timeout)),
reraise=True,
)
def fetch(url):
r = requests.get(url, timeout=10)
if r.status_code == 429:
raise requests.ConnectionError(f'rate limited, retry-after={r.headers.get("Retry-After")}')
if 500 <= r.status_code < 600:
raise requests.ConnectionError(f'5xx: {r.status_code}')
r.raise_for_status()
return r.json()Низкоуровневая HTTP-библиотека, на которой построены requests и pip. Управляет connection pooling, retry, SSL-context, proxies. Прямое использование urllib3 -- редкость, обычно через requests/httpx, но полезно знать классы (`PoolManager`, `Retry`, `HTTPSConnectionPool`) для тонкой настройки adapter'ов. urllib3 v2.x требует OpenSSL 1.1.1+ -- на старых системах (CentOS 7) могут быть ограничения. Версии 1.x и 2.x несовместимы по API в нескольких местах (Retry params), но requests 2.34 совместим с обеими. В urllib3 же реализован connection-level keep-alive, response chunking, retry-policy.
import urllib3
# Минимальный пример (без requests)
http = urllib3.PoolManager(
num_pools=10,
maxsize=20,
cert_reqs='CERT_REQUIRED',
)
r = http.request('GET', 'https://api.example.com/v1/users/42',
timeout=urllib3.Timeout(connect=5, read=10),
retries=urllib3.Retry(3, backoff_factor=0.5))
print(r.status)
print(r.data.decode('utf-8'))
# Disable warnings (не для prod):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)Стандартная библиотека Python для асинхронного I/O через event loop и coroutines (`async def`/`await`). Позволяет одновременно выполнять тысячи I/O-операций (HTTP-запросы, чтение файлов) без потоков. Для HTTP-клиента -- httpx.AsyncClient или aiohttp. Главное правило: `await`-абельные операции делегируются в event loop, синхронный код блокирует loop (поэтому `requests.get()` внутри async-функции -- антипаттерн). Запуск: `asyncio.run(coro())`. Параллелизм через `asyncio.gather()` или `asyncio.TaskGroup` (Python 3.11+). Для DE -- массивные fan-out на API: тысячи продуктов через `/products/{id}` концентрируется в секунды, а не в часы.
import asyncio
import httpx
async def fetch_one(client, url):
r = await client.get(url, timeout=10)
return r.json()
async def fetch_all(urls):
async with httpx.AsyncClient(http2=True) as client:
# TaskGroup -- Python 3.11+, гарантированный cleanup
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(client, u)) for u in urls]
return [t.result() for t in tasks]
# Запуск
results = asyncio.run(fetch_all([
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/users/3',
]))Машиночитаемый формат описания REST API в YAML/JSON. Текущая версия -- 3.1 (с обзором 3.2 -- draft на май 2026). 3.0 был мажорным апгрейдом из Swagger 2.0 (2017), 3.1 принёс полную совместимость с JSON Schema 2020-12 (раньше был свой суперсет). Описывает: paths, methods, parameters, request/response schemas, security schemes, servers, components. Используется для: (1) auto-generation документации (Swagger UI, Redoc); (2) генерации server-stubs и client-SDK (openapi-generator, openapi-python-client, fastapi `--openapi-json`); (3) contract-testing; (4) mocking сервера до его реализации. FastAPI генерирует OpenAPI автоматически из type-hints + Pydantic.
openapi: 3.1.0
info:
title: Users API
version: 1.0.0
servers:
- url: https://api.example.com/v1
paths:
/users/{id}:
get:
operationId: getUser
parameters:
- name: id
in: path
required: true
schema:
type: integer
responses:
'200':
description: User
content:
application/json:
schema:
$ref: '#/components/schemas/User'
components:
schemas:
User:
type: object
required: [id, name]
properties:
id: {type: integer}
name: {type: string}
email: {type: string, format: email, nullable: true}Исторически -- формат описания API (Swagger 1.x, 2.0), который в 2015 был передан в OpenAPI Initiative и переименован в OpenAPI Specification (с версии 3.0). Сейчас «Swagger» -- это семейство инструментов SmartBear: Swagger UI (interactive HTML-документация), Swagger Editor (online редактор спек), Swagger Codegen (генератор клиентов/серверов). На практике слова «Swagger» и «OpenAPI» часто используют как синонимы, особенно при описании UI: «открыть Swagger» = «открыть Swagger UI на /docs». В API-сообществе уточняют: «спека» -- OpenAPI, «UI» -- Swagger UI или Redoc.
# В FastAPI Swagger UI автоматически на /docs:
from fastapi import FastAPI
app = FastAPI(title='My API')
@app.get('/users/{id}')
def get_user(id: int):
return {'id': id, 'name': 'Lev'}
# Открыть http://localhost:8000/docs (Swagger UI)
# или http://localhost:8000/redoc (Redoc)
# Спека на http://localhost:8000/openapi.json
# Standalone Swagger UI:
# docker run -p 8080:8080 -v $(pwd)/openapi.yaml:/api/swagger.yaml \
# -e SWAGGER_JSON=/api/swagger.yaml swaggerapi/swagger-uiОписание структуры данных в OpenAPI: типы (object, array, string, integer, number, boolean), required-поля, форматы (date-time, email, uuid, byte, binary), constraints (minLength, maxLength, pattern, minimum, maximum, enum). С OpenAPI 3.1 -- full-compat с JSON Schema 2020-12, можно использовать `oneOf`/`anyOf`/`allOf`/`not`, `if`/`then`/`else`, `unevaluatedProperties`. Schema валидируется на стороне сервера (FastAPI автоматически через Pydantic) и клиента (openapi-python-client генерирует Pydantic-модели). Schema reuse через `$ref: '#/components/schemas/User'`.
components:
schemas:
User:
type: object
required: [id, name, email]
properties:
id: {type: integer, minimum: 1}
name:
type: string
minLength: 1
maxLength: 100
email:
type: string
format: email
tags:
type: array
items: {type: string}
maxItems: 10
role:
type: string
enum: [admin, user, guest]
created_at:
type: string
format: date-time
additionalProperties: false # запретить лишние поляСпособ переиспользования определений в OpenAPI/JSON Schema через ссылку: `$ref: '#/components/schemas/User'` -- относительная (внутри текущего документа), `$ref: 'common.yaml#/User'` -- на другой файл, `$ref: 'https://schemas.example.com/v1/user.json'` -- на удалённый. Resolver разворачивает ref в рантайме. Pro: DRY, schemas описываются один раз. Cons: круговые ссылки иногда ломают codegen, расщепление на много файлов усложняет навигацию. С OpenAPI 3.1 в `$ref` нельзя смешивать другие keys (был грех в 3.0) -- для override используйте `allOf: [{$ref: ...}, {description: ...}]`.
paths:
/users/{id}:
get:
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/User'
components:
schemas:
User:
type: object
properties:
id: {type: integer}
name: {type: string}
manager:
$ref: '#/components/schemas/User' # рекурсивная ссылка
# Override через allOf:
PartialUser:
allOf:
- $ref: '#/components/schemas/User'
- required: []
description: 'User для PATCH -- все поля optional'Поле в OpenAPI schema, явно указывающее, какой подтип `oneOf` использовать на основе значения дискриминатора. Без него tools должны guessить тип через trial-and-error валидацию по каждой schema из oneOf -- медленно и иногда неоднозначно. С `discriminator: {propertyName: type}` парсер сразу знает: видим `"type": "admin"` -> используем `Admin` schema. Часто используется в polymorphic responses: один endpoint может вернуть Order, Refund, Subscription -- discriminator `event_type` указывает what to parse. Codegen-tools генерируют tagged unions для языков с algebraic types.
components:
schemas:
Pet:
oneOf:
- $ref: '#/components/schemas/Dog'
- $ref: '#/components/schemas/Cat'
discriminator:
propertyName: pet_type
mapping:
dog: '#/components/schemas/Dog'
cat: '#/components/schemas/Cat'
Dog:
type: object
properties:
pet_type: {type: string, enum: [dog]}
bark: {type: string}
Cat:
type: object
properties:
pet_type: {type: string, enum: [cat]}
purr: {type: string}
# В Pydantic:
# from typing import Literal, Annotated, Union
# class Dog(BaseModel): pet_type: Literal['dog']; bark: str
# class Cat(BaseModel): pet_type: Literal['cat']; purr: str
# Pet = Annotated[Union[Dog, Cat], Field(discriminator='pet_type')]Автоматическая генерация client SDK или server stubs из OpenAPI спеки. Главные инструменты: `openapi-generator` (Java-based, поддерживает 50+ языков), `openapi-python-client` (Python-only, Pydantic-models, type-safe), `swagger-codegen` (предшественник openapi-generator). Pro: вы пишете спеку один раз, получаете типизированный клиент во всех языках; меньше boilerplate; компилятор ловит mismatch. Cons: сгенерированный код часто неидиоматичен (особенно для Python из Java-templates), сложно править вручную (перезатрётся при regen), иногда генерит уродливые имена методов. Альтернатива -- вручную писать клиент с Pydantic-моделями.
# Установка
pip install openapi-python-client
# Генерация из локальной спеки
openapi-python-client generate --path openapi.yaml
# Из URL
openapi-python-client generate --url https://api.example.com/openapi.json
# Сгенерируется my_api_client/ с моделями и методами
from my_api_client import Client
from my_api_client.api.users import get_user
from my_api_client.models import User
client = Client(base_url='https://api.example.com/v1')
user: User = get_user.sync(client=client, id=42)
print(user.name)
# Альтернатива -- openapi-generator (Java):
# brew install openapi-generator
# openapi-generator generate -i openapi.yaml -g python -o ./clientПростейшая HTTP-аутентификация (RFC 7617): клиент передаёт `Authorization: Basic <base64(login:password)>`. Base64 -- это НЕ шифрование, login:password легко декодируется любым `base64 -d` -- защита только от случайного просмотра. Поэтому Basic Auth допустим ТОЛЬКО поверх HTTPS. Используется в простых API и CI-системах (artifactory), basic-protected nginx-зонах, base-auth proxy. Ограничения: токен передаётся с каждым запросом (если утёк -- сразу отзыв пароля), нет встроенного expiry, нет scopes. В REST API всё чаще заменён на API key или Bearer token.
import requests
from requests.auth import HTTPBasicAuth
import base64
# Через requests
r = requests.get('https://api.example.com/v1/data',
auth=HTTPBasicAuth('user', 'pass'))
# Эквивалентно:
r = requests.get('https://api.example.com/v1/data',
auth=('user', 'pass'))
# Вручную (что под капотом):
creds = base64.b64encode(b'user:pass').decode()
r = requests.get(url, headers={'Authorization': f'Basic {creds}'})
# В curl:
# curl -u user:pass https://api.example.com/v1/data
# curl -H 'Authorization: Basic dXNlcjpwYXNz' ...Длинная случайная строка, идентифицирующая клиента (не пользователя). Передаётся в header (`X-API-Key`, `Authorization`), query (`?api_key=...`) или body. Передавать в query -- антипаттерн (попадает в логи nginx, Referer headers, browser history). API key обычно привязан к проекту/приложению, имеет scopes (read-only, admin), rate-limits, можно отозвать через UI. Для DE: главный auth-механизм при работе с публичными API (OpenAI, Stripe test mode, погодные сервисы). Хранить в env-переменной (`os.environ['API_KEY']`), не коммитить, ротировать при подозрении на утечку.
import os
import requests
# В header (recommended):
api_key = os.environ['EXAMPLE_API_KEY']
headers = {'X-API-Key': api_key}
r = requests.get('https://api.example.com/v1/data', headers=headers)
# Или Authorization:
headers = {'Authorization': f'ApiKey {api_key}'}
# В query (АНТИПАТТЕРН -- попадёт в логи):
# r = requests.get(f'https://api.example.com/v1/data?api_key={api_key}')
# Detect-secrets / gitleaks ловят коммиты с API keys:
# pre-commit-hook предотвратит push api_key='sk_live_***'Авторизация через `Authorization: Bearer <token>` (RFC 6750). «Bearer» означает «предъявитель» -- кто бы ни предъявил токен, тот и авторизован (без дополнительной проверки личности). Поэтому токен надо передавать только по HTTPS, хранить безопасно (env, secret manager, не localStorage если можно), коротко жить (минуты-часы) и иметь refresh механизм. Главная форма Bearer -- JWT (см. JWT), но также opaque-токены (просто рандомная строка, сервер хранит mapping в БД/Redis). OAuth2 и OpenID Connect используют именно Bearer как способ передачи access-токена.
import requests
import os
token = os.environ['API_TOKEN']
headers = {'Authorization': f'Bearer {token}'}
r = requests.get('https://api.example.com/v1/me', headers=headers)
# Через Session (применится ко всем запросам):
s = requests.Session()
s.headers['Authorization'] = f'Bearer {token}'
# В httpx есть встроенный auth:
import httpx
client = httpx.Client(
base_url='https://api.example.com/v1',
headers={'Authorization': f'Bearer {token}'},
)Стандарт self-contained токенов (RFC 7519): три base64url-encoded части `header.payload.signature`, разделённые точками. Header: `{"alg": "HS256", "typ": "JWT"}`. Payload: claims вроде `sub` (subject/user_id), `exp` (expiry epoch), `iat` (issued at), `iss` (issuer), `aud` (audience), плюс custom (`role`, `tenant_id`). Signature: HMAC (`HS256`) или RSA/ECDSA (`RS256`/`ES256`) от `header.payload`. Главные грабли: (1) `alg=none` -- старая уязвимость, библиотека верит без подписи, ОБЯЗАТЕЛЬНО whitelist algorithms; (2) payload НЕ шифруется -- не класть туда секретов; (3) JWT нельзя revoke без блэклиста -- короткие expiry (5-15 мин) + refresh-токены.
import jwt # PyJWT
import time
# Подпись
payload = {
'sub': '42',
'role': 'de',
'iat': int(time.time()),
'exp': int(time.time()) + 900, # 15 минут
}
token = jwt.encode(payload, 'secret-key', algorithm='HS256')
print(token)
# eyJhbGciOiJIUzI1NiI...
# Верификация (ОБЯЗАТЕЛЬНО передавайте algorithms!):
try:
decoded = jwt.decode(
token,
'secret-key',
algorithms=['HS256'], # <- whitelist
options={'require': ['exp', 'iat']},
)
print(decoded['sub'])
except jwt.ExpiredSignatureError:
print('Token expired')
except jwt.InvalidTokenError as e:
print(f'Invalid: {e}')Framework для делегированной авторизации (RFC 6749): user разрешает application'у действовать от его имени без передачи пароля. Не аутентификация (для этого OpenID Connect -- слой поверх OAuth2). Ключевые роли: Resource Owner (юзер), Client (приложение), Authorization Server (выдаёт токены), Resource Server (API, проверяет токены). Flows: Authorization Code + PKCE (для веб/SPA/mobile, рекомендованный), Client Credentials (server-to-server, без юзера), Device Code (TVs, CLI), Implicit (deprecated), Resource Owner Password (deprecated). Результат -- access_token (короткий) + refresh_token (долгий). Реализации -- Auth0, Okta, Google, GitHub.
# Authorization Code Flow с PKCE (web app)
# 1. Сгенерировать code_verifier и code_challenge
import secrets, hashlib, base64
verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b'=').decode()
# 2. Редирект юзера на authorize endpoint:
# https://auth.example.com/authorize?
# response_type=code&
# client_id=...&redirect_uri=...&
# scope=read:data&state=...&
# code_challenge=...&code_challenge_method=S256
# 3. После redirect с code обменять на токены:
import requests
r = requests.post('https://auth.example.com/token', data={
'grant_type': 'authorization_code',
'code': received_code,
'redirect_uri': 'https://app.example.com/callback',
'client_id': '...',
'code_verifier': verifier, # <- PKCE
})
print(r.json())
# {'access_token': '...', 'refresh_token': '...', 'expires_in': 3600}Расширение OAuth2 (RFC 7636), защищающее Authorization Code от перехвата. Клиент генерирует случайный `code_verifier` (43-128 символов), хеширует SHA256 -> `code_challenge`, шлёт challenge в /authorize. После получения code, на /token шлёт уже verifier. Сервер сравнивает: SHA256(verifier) == challenge? Если кто-то перехватил код -- без verifier не сможет обменять. Изначально для mobile/native (где client_secret хранить негде), сейчас рекомендован для ВСЕХ flow -- даже web с client_secret. Алгоритм: `S256` (рекомендован) или `plain` (только для legacy). Без PKCE Authorization Code Flow считается deprecated в современных OAuth-подходах.
import secrets, hashlib, base64
# Генерация code_verifier (43-128 chars, [A-Z, a-z, 0-9, -, _, ., ~])
verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
print(len(verifier)) # 43
# Code challenge (S256 -- рекомендован)
digest = hashlib.sha256(verifier.encode()).digest()
challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
# В /authorize URL:
# code_challenge=<challenge>&code_challenge_method=S256
# В /token request:
# code_verifier=<verifier>
# Сервер проверит: base64url(sha256(verifier)) == challengeГлавный OAuth2 flow для интерактивных приложений (web, mobile). Шаги: (1) приложение редиректит юзера на /authorize с client_id, redirect_uri, scope, state, code_challenge (PKCE); (2) юзер логинится в IdP и одобряет scopes; (3) IdP редиректит обратно с `?code=...&state=...`; (4) клиент проверяет state (CSRF), обменивает code на /token endpoint -> получает access_token + refresh_token. Code короткоживущий (минуты, single-use). State обязательно проверять -- защита от CSRF. PKCE сейчас обязателен для всех вариантов. Альтернативы -- Implicit Flow (deprecated, токен прямо в URL) и Resource Owner Password (deprecated, app собирает пароль).
from urllib.parse import urlencode
import requests, secrets, hashlib, base64
# Step 1: redirect URL
verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b'=').decode()
state = secrets.token_urlsafe(32)
auth_url = 'https://auth.example.com/authorize?' + urlencode({
'response_type': 'code',
'client_id': 'my-app',
'redirect_uri': 'https://app.example.com/callback',
'scope': 'read:data write:data',
'state': state,
'code_challenge': challenge,
'code_challenge_method': 'S256',
})
# webbrowser.open(auth_url) или для server-side: redirect юзера
# Step 4: после callback с code
r = requests.post('https://auth.example.com/token', data={
'grant_type': 'authorization_code',
'code': received_code,
'redirect_uri': 'https://app.example.com/callback',
'client_id': 'my-app',
'code_verifier': verifier,
}).json()OAuth2 flow без участия user -- приложение аутентифицируется само (client_id + client_secret) и получает access_token. Используется для server-to-server интеграций (machine-to-machine, M2M): когда ваш ETL-скрипт обращается к API напрямую, без авторизации конкретного юзера. Простейший flow: POST /token с `grant_type=client_credentials`, client_id, client_secret, scope. Refresh-токен в этом flow обычно НЕ выдаётся (приложение легко получит новый по credentials). Главное -- secret хранить надёжно (env-vars, vault, secret manager), ротировать регулярно, scope давать минимально необходимый.
import requests
import os
# M2M аутентификация ETL-скрипта
r = requests.post(
'https://auth.example.com/oauth/token',
data={
'grant_type': 'client_credentials',
'client_id': os.environ['ETL_CLIENT_ID'],
'client_secret': os.environ['ETL_CLIENT_SECRET'],
'scope': 'read:datasets write:logs',
'audience': 'https://api.example.com', # часто требуют (Auth0)
},
timeout=10,
)
r.raise_for_status()
token_data = r.json()
access_token = token_data['access_token']
expires_in = token_data['expires_in'] # секунды
# Используем для всех API-вызовов до expiry
headers = {'Authorization': f'Bearer {access_token}'}
requests.get('https://api.example.com/v1/data', headers=headers)Параметр OAuth2, описывающий, какие именно permissions запрашивает приложение. Передаётся через space-separated список (`scope=read:data write:logs admin:users`). Сервер аутентификации показывает юзеру consent-экран с этими scopes («Приложение хочет получить: read your data, write to logs»). После approve -- токен ограничен этими scopes. Resource Server проверяет: в access-токене есть нужный scope? Нет -- 403 Forbidden. Convention для имён: `verb:resource` (`read:user`, `write:billing`), `<service>.<action>` (Google: `https://www.googleapis.com/auth/drive.file`). Принцип least privilege -- запрашивать минимум необходимого.
# Authorize URL с scopes:
# https://accounts.google.com/o/oauth2/v2/auth?
# client_id=...&redirect_uri=...&
# scope=https%3A//www.googleapis.com/auth/drive.file%20
# https%3A//www.googleapis.com/auth/userinfo.email&
# response_type=code
# В JWT access-токене:
import jwt
token = jwt.encode({
'sub': '42',
'scope': 'read:data write:logs', # <- разрешения
'exp': 1715000000,
}, 'secret', algorithm='HS256')
# В FastAPI можно объявить required scopes:
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
oauth2 = OAuth2AuthorizationCodeBearer(...)
@app.get('/admin/users')
def list_users(scopes: SecurityScopes = SecurityScopes(scopes=['admin:users'])):
...Долгоживущий токен (часы-дни-месяцы), используемый для получения нового access_token без повторного login. Access tokens специально короткие (5-15 минут) для минимизации damage при утечке; refresh -- длинный, но используется только в защищённой среде (server-to-server вызов /token, не на клиенте). Refresh передаётся через `grant_type=refresh_token` в /token endpoint вместе с client_id (и secret, если confidential client). Best practice: refresh-rotation -- при использовании старый refresh инвалидируется, выдаётся новый (защита от replay при утечке). Refresh должен храниться надёжно (HttpOnly cookie, secure storage, никак не localStorage).
import requests
import os, time
class TokenManager:
def __init__(self):
self.access = None
self.refresh = os.environ['INITIAL_REFRESH_TOKEN']
self.expires_at = 0
def get_access(self):
if time.time() >= self.expires_at - 30: # обновляем за 30s до expiry
self._refresh()
return self.access
def _refresh(self):
r = requests.post('https://auth.example.com/oauth/token', data={
'grant_type': 'refresh_token',
'refresh_token': self.refresh,
'client_id': os.environ['CLIENT_ID'],
'client_secret': os.environ['CLIENT_SECRET'],
}, timeout=10)
r.raise_for_status()
d = r.json()
self.access = d['access_token']
# Refresh-rotation: сервер мог вернуть НОВЫЙ refresh
self.refresh = d.get('refresh_token', self.refresh)
self.expires_at = time.time() + d['expires_in']Способ возврата большой коллекции порциями вместо одного гигантского ответа. Главные стратегии: (1) Offset/Limit -- `?offset=200&limit=50`, простая, но медленная на больших offset (СУБД сканирует все skipped) и страдает от дубликатов/пропусков при изменении данных во время обхода; (2) Cursor (Keyset) -- `?cursor=eyJpZCI6MTAwfQ&limit=50`, opaque-токен с указанием на последнюю запись, стабильна и быстра, но не может прыгнуть на «страницу 5»; (3) Page-based -- `?page=3&page_size=50`, частный случай offset; (4) Link-header (GitHub) -- `Link: <next_url>; rel="next"` с готовыми URL. Для DE -- обходить ВСЕГДА циклом до пустого ответа / null cursor, не доверять `total`.
import requests
# Cursor pagination (Stripe-style)
def list_all_users(api):
cursor = None
while True:
params = {'limit': 100}
if cursor:
params['starting_after'] = cursor
r = requests.get(f'{api}/v1/users', params=params, timeout=30)
r.raise_for_status()
data = r.json()
for u in data['data']:
yield u
if not data.get('has_more'):
break
cursor = data['data'][-1]['id']
# GitHub Link-header pagination:
import re
def parse_link(link_header):
return dict(re.findall(r'<([^>]+)>; rel="(\w+)"', link_header)[::-1])
url = 'https://api.github.com/orgs/python/repos?per_page=100'
while url:
r = requests.get(url, headers={'Accept': 'application/vnd.github+json'})
yield from r.json()
links = parse_link(r.headers.get('Link', ''))
url = links.get('next')Подход к пагинации: сервер возвращает opaque-токен (`next_cursor`), указывающий на «продолжать после этой записи». Внутри cursor обычно зашифрованный/encoded ID последней записи + sort-key (`{"id": 100, "created_at": "2026-05-01"}`). Преимущества: (1) O(log n) с индексом на (sort_key, id) -- нет full-scan для skip; (2) стабильно при concurrent inserts/deletes -- не пропускает и не дублирует; (3) сервер может менять формат cursor без breaking-API. Минусы: нельзя прыгнуть на «страницу 50» (только next/prev), нельзя посчитать total cheaply. Используется в Stripe, Twitter API v2, Firestore, Notion.
# Реализация cursor на стороне сервера (FastAPI):
from fastapi import FastAPI, Query
from base64 import urlsafe_b64encode, urlsafe_b64decode
import json
app = FastAPI()
@app.get('/users')
def list_users(cursor: str | None = None, limit: int = Query(50, le=100)):
where = ''
params = []
if cursor:
decoded = json.loads(urlsafe_b64decode(cursor))
where = 'WHERE (created_at, id) > (?, ?)'
params = [decoded['created_at'], decoded['id']]
rows = db.query(f'SELECT * FROM users {where} ORDER BY created_at, id LIMIT ?',
params + [limit + 1])
has_more = len(rows) > limit
rows = rows[:limit]
next_cursor = None
if has_more and rows:
last = rows[-1]
next_cursor = urlsafe_b64encode(json.dumps({
'created_at': last['created_at'].isoformat(),
'id': last['id'],
}).encode()).decode()
return {'data': rows, 'next_cursor': next_cursor, 'has_more': has_more}Пагинация через `?offset=200&limit=50` (или эквиваленты `?skip=200&take=50`, `?page=5&page_size=50`). Простая в имплементации (`SELECT ... LIMIT 50 OFFSET 200`), удобна для UI с цифрами страниц. Минусы: (1) на больших offset деградация -- СУБД должна просканировать (offset+limit) строк и выкинуть offset; (2) при concurrent insert/delete возникают дубликаты или пропуски (запись сдвинулась -- обход видит её на двух страницах или ни на одной); (3) если данные часто меняются -- offset бесполезен для бэкап-выгрузок. Подходит для: статичных каталогов, админок с фильтрами и постраничным UI. Для бэкапов/ETL -- переходите на cursor.
import requests
# Простой offset-обход (НЕ для меняющихся данных)
def list_all(api):
offset = 0
limit = 100
while True:
r = requests.get(f'{api}/products',
params={'offset': offset, 'limit': limit},
timeout=30)
r.raise_for_status()
items = r.json()['items']
if not items:
break
yield from items
if len(items) < limit:
break # последняя страница
offset += limit
# Опасно для concurrent inserts:
# t=0: GET ?offset=0&limit=10 -> id 1..10
# t=1: новый item id 0 вставлен в начало (по сортировке)
# t=2: GET ?offset=10&limit=10 -> id 10..19 <- id 10 дублируется!Ограничение количества запросов в единицу времени к API: на ключ, на IP, на endpoint. Защищает API от перегрузки и злоупотреблений. Сервер может вернуть текущий статус через headers: `X-RateLimit-Limit: 100`, `X-RateLimit-Remaining: 5`, `X-RateLimit-Reset: 1715000000` (epoch когда сбросится). При превышении -- `429 Too Many Requests` с `Retry-After: 30` (секунды) или конкретной датой. Стратегии: (1) token bucket (всплески допустимы); (2) leaky bucket (равномерно); (3) fixed window / sliding window. Клиент должен respect лимиты -- при 429 sleep на Retry-After, не biting фасадно. На стороне DE -- proactive rate-limit на client-side (asyncio.Semaphore, aiolimiter).
import requests
import time
# Полное respect rate-limit headers:
def request_with_rate_limit(url, **kwargs):
r = requests.get(url, **kwargs)
if r.status_code == 429:
retry_after = int(r.headers.get('Retry-After', 60))
time.sleep(retry_after)
return request_with_rate_limit(url, **kwargs)
# Proactive throttling -- тормозим заранее
remaining = int(r.headers.get('X-RateLimit-Remaining', 100))
if remaining < 5:
reset = int(r.headers.get('X-RateLimit-Reset', time.time() + 60))
sleep = max(0, reset - time.time()) / max(remaining, 1)
time.sleep(sleep)
return r
# Async client-side limiting:
# pip install aiolimiter
# from aiolimiter import AsyncLimiter
# limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 req/sec
# async with limiter:
# await client.get(url)Status code, означающий «вы превысили rate-limit». В отличие от других 4xx -- это retry-able: причина не в неправильном запросе, а в превышении квоты. Сервер обычно возвращает `Retry-After: <seconds>` или `Retry-After: <HTTP-date>` -- клиент должен подождать столько и повторить. Иногда встречается `429` без Retry-After -- тогда применять exponential backoff с jitter. На больших нагрузках 429 -- нормальное явление: клиентский код должен корректно обрабатывать (sleep + retry), не падать с exception. Никогда не игнорируйте Retry-After -- продолжая бомбить API при 429, можно получить permanent ban или эскалацию до 403/прекращения сервиса.
import requests
import time
import random
def fetch_with_429_handling(url, max_retries=5):
for attempt in range(max_retries):
r = requests.get(url, timeout=10)
if r.status_code != 429:
return r
# Учитываем Retry-After:
retry_after = r.headers.get('Retry-After')
if retry_after:
try:
sleep = float(retry_after)
except ValueError:
# HTTP-date вместо секунд
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(retry_after)
sleep = max(0, (dt.timestamp() - time.time()))
else:
# Exponential backoff с jitter
sleep = (2 ** attempt) + random.uniform(0, 1)
time.sleep(sleep)
raise RuntimeError(f'Failed after {max_retries} retries')Стратегия retry: задержка между попытками растёт экспоненциально -- `delay = base * 2^attempt`. Например, base=1s даёт паузы 1, 2, 4, 8, 16, 32 секунды. Цель -- не добивать упавший сервис своими retry'ями, дать ему время восстановиться. Без exponential backoff thundering herd: 1000 клиентов одновременно retry'ят -> сервер опять падает. Часто комбинируется с jitter (см. Jitter), чтобы клиенты не синхронизировались. Также важен cap (`max_delay=60s`) -- без него ждать 1024 секунды бессмысленно. Реализации: `tenacity.wait_exponential`, `backoff.expo`, `urllib3.Retry(backoff_factor=...)`.
import time
import random
def exponential_backoff(attempt, base=1.0, cap=60.0, jitter=True):
delay = min(cap, base * (2 ** attempt))
if jitter:
delay = random.uniform(0, delay) # full jitter (AWS recommended)
return delay
for attempt in range(6):
try:
result = fetch()
break
except (ConnectionError, Timeout):
if attempt == 5:
raise
sleep = exponential_backoff(attempt)
time.sleep(sleep)
# С tenacity:
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
@retry(stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=60, jitter=2))
def fetch():
...Случайное отклонение, добавляемое к delay в backoff. Без jitter тысяча клиентов с одинаковым backoff retry'ят в одно и то же время -- синхронизированный «thundering herd», сервер опять падает. С jitter каждый клиент берёт `delay = random.uniform(0, exp_delay)` (full jitter, рекомендация AWS) или `delay = exp_delay/2 + random.uniform(0, exp_delay/2)` (equal jitter) -- попытки распределяются равномерно. AWS architects опубликовали статью «Exponential Backoff and Jitter» (2015), где доказали, что full jitter оптимален для большинства случаев. В Python -- `random.uniform()` либо встроенные wait-strategies в tenacity/backoff.
import random
import time
# Без jitter -- все клиенты ждут одинаково
def no_jitter(attempt, base=1):
return base * (2 ** attempt)
# Full jitter (AWS recommendation)
def full_jitter(attempt, base=1, cap=60):
return random.uniform(0, min(cap, base * (2 ** attempt)))
# Equal jitter
def equal_jitter(attempt, base=1, cap=60):
delay = min(cap, base * (2 ** attempt))
return delay / 2 + random.uniform(0, delay / 2)
# Decorrelated jitter (для адаптивного)
def decorrelated_jitter(prev_delay, base=1, cap=60):
return min(cap, random.uniform(base, prev_delay * 3))
for a in range(5):
print(f'attempt {a}: no={no_jitter(a):.2f}s, full={full_jitter(a):.2f}s')Паттерн отказоустойчивости: если процент ошибок к downstream-сервису превысил threshold за окно времени, circuit «размыкается» -- последующие запросы немедленно падают с ошибкой, не доходя до сервиса. Через timeout (half-open) делается пробный запрос -- если успех, цепь снова замыкается. Защищает: (1) downstream от добивания при падении; (2) upstream от траты времени и ресурсов на заведомо проигрышные запросы; (3) от cascading failures между сервисами. Состояния: CLOSED (норм), OPEN (запросы блокируются), HALF_OPEN (пробуем). Реализации в Python: `pybreaker`, `circuitbreaker`, `purgatory`. Классическая статья -- Michael Nygard «Release It!».
from pybreaker import CircuitBreaker, CircuitBreakerError
import requests
# 5 подряд failures -> 60 сек OPEN
breaker = CircuitBreaker(
fail_max=5,
reset_timeout=60,
exclude=[requests.HTTPError], # 4xx не считаем как failures
)
@breaker
def call_downstream(user_id):
r = requests.get(f'https://api.example.com/users/{user_id}', timeout=5)
r.raise_for_status()
return r.json()
for uid in user_ids:
try:
data = call_downstream(uid)
except CircuitBreakerError:
# Цепь OPEN -- fallback к кэшу или dead-letter queue
data = cache.get(f'user:{uid}') or {'_unavailable': True}
except requests.HTTPError as e:
log(f'http error: {e}')Query-language для API, разработанный Facebook (2015), open-source. Один endpoint (`/graphql`), клиент описывает запросом ровно те поля, которые ему нужны -- нет over-fetching и under-fetching, типичных для REST. Сильная типизация через schema (`.graphql` или SDL). Операции: query (read), mutation (write), subscription (real-time через WebSocket). Pro: гибкость для frontend (один запрос вместо нескольких REST), self-documenting (introspection), strong typing. Cons: сложнее кэшировать на CDN-уровне (POST с разными body), N+1 queries на сервере без DataLoader, сложнее rate-limit (cost-analysis вместо req/sec). Для DE -- GitHub GraphQL API, Shopify Admin API. Клиент в Python: `gql`.
import requests
query = '''
query GetUser($id: ID!) {
user(id: $id) {
id
name
email
orders(first: 10) {
edges {
node { id total }
}
}
}
}
'''
r = requests.post(
'https://api.example.com/graphql',
json={'query': query, 'variables': {'id': '42'}},
headers={'Authorization': 'Bearer ...'},
)
data = r.json()['data']
print(data['user']['name'])
# С gql библиотекой:
# from gql import gql, Client
# from gql.transport.requests import RequestsHTTPTransport
# client = Client(transport=RequestsHTTPTransport(url=...))
# result = client.execute(gql(query), variable_values={'id': '42'})Высокопроизводительный RPC-фреймворк от Google поверх HTTP/2 + Protobuf. Контракт описывается в `.proto` (services + messages), `protoc` генерирует stubs для клиента и сервера. Поддерживает 4 вида RPC: unary (1 req -> 1 resp), server streaming (1 req -> N resp), client streaming (N req -> 1 resp), bidirectional streaming (N <-> N). Pro: компактный wire format (бинарный), низкая latency, streaming, strict contract, codegen для 10+ языков. Cons: НЕ работает в браузере без gRPC-Web proxy, плохо дебажится без специальных tools (`grpcurl`), статус кодов меньше чем в HTTP, сложнее проксировать через стандартные load balancers. Используется внутри microservice-mesh.
// helloworld.proto
syntax = "proto3";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest { string name = 1; }
message HelloReply { string message = 1; }
# Компиляция:
# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto
import grpc
import helloworld_pb2, helloworld_pb2_grpc
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='Lev'),
timeout=10)
print(response.message)
# Server streaming пример:
# for resp in stub.ListUpdates(request):
# print(resp)Парадигма межпроцессного взаимодействия, моделирующая вызов удалённой функции как локальной. Клиент вызывает `service.method(args)`, под капотом -- сериализация args, сетевой transport, десериализация на сервере, вызов реализации, обратно. Версии RPC: XML-RPC (1998, текстовый XML), JSON-RPC (2005, текстовый JSON), gRPC (2015, бинарный), Apache Thrift, MessagePack-RPC. Отличие от REST: RPC мыслит глаголами (`createUser`, `sendEmail`), REST -- существительными и HTTP-методами. RPC обычно эффективнее по wire (бинарный, без overhead REST-headers), но менее удобен для веб-API (требует client SDK). Подходит для internal microservices, игр, IoT.
# JSON-RPC 2.0 пример (текстовый, легко руками)
import requests
request = {
'jsonrpc': '2.0',
'method': 'subtract',
'params': {'minuend': 42, 'subtrahend': 23},
'id': 1
}
r = requests.post('https://api.example.com/jsonrpc', json=request)
print(r.json())
# {'jsonrpc': '2.0', 'result': 19, 'id': 1}
# Batch:
batch = [
{'jsonrpc':'2.0', 'method':'sum', 'params':[1,2,3], 'id':1},
{'jsonrpc':'2.0', 'method':'subtract', 'params':[10,5], 'id':2},
]
requests.post(url, json=batch)Подход к разработке API: сначала пишется контракт (OpenAPI YAML, .proto, GraphQL SDL, Avro schema), потом генерируется код (server stubs + client SDK). Альтернатива -- code-first: сначала пишется код (FastAPI с type-hints, gRPC-implementation), из которого автоматически выводится спека. Schema-first плюсы: контракт согласован между командами заранее, можно параллелить frontend/backend разработку, проще mocking, спека -- single source of truth. Минусы: больше церемонии (PR на изменение спеки + регенерация), сложнее agile-итерации. Для DE-команды schema-first хорошо подходит при контракте с внешними партнёрами; внутри Python-команды code-first через FastAPI часто практичнее.
# Schema-first (OpenAPI):
# 1. Описать openapi.yaml
# 2. openapi-generator generate -i openapi.yaml -g python-fastapi -o ./server
# 3. Реализовать handler'ы по сгенерированным stubs
# Code-first (FastAPI):
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class User(BaseModel):
id: int
name: str
email: str | None = None
@app.get('/users/{id}', response_model=User)
def get_user(id: int) -> User:
return User(id=id, name='Lev')
# OpenAPI auto-generated на /openapi.json
# Можно экспортировать: app.openapi() -> dictМеханизм push-нотификаций: вместо периодического polling клиент регистрирует у сервера URL (`https://my-app.com/hooks/payment`), и сервер шлёт HTTP POST на этот URL при наступлении события. Преимущество над polling -- меньше latency и нагрузки. Структура webhook-payload зависит от провайдера: Stripe шлёт event-объект с типом и данными, GitHub -- с информацией о PR/push. Главные грабли: (1) проверка подписи (HMAC от body с secret) -- без неё любой может подделать event; (2) idempotency -- webhook может прийти несколько раз (sender retry'ит при non-2xx), храните processed_event_ids; (3) ответ должен быть быстрый (2-5 сек), иначе sender ретрайнет -- heavy work делайте в background.
from fastapi import FastAPI, Request, HTTPException
import hmac, hashlib, os
app = FastAPI()
WEBHOOK_SECRET = os.environ['STRIPE_WEBHOOK_SECRET']
@app.post('/hooks/stripe')
async def handle_stripe(req: Request):
body = await req.body()
signature = req.headers.get('Stripe-Signature', '')
# Проверка подписи (упрощённо; Stripe использует более сложный формат)
expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, signature.split('=')[-1]):
raise HTTPException(401, 'Invalid signature')
event = await req.json()
# Idempotency -- не обрабатываем повторно
if already_processed(event['id']):
return {'received': True}
mark_processed(event['id'])
# Background task -- отвечаем быстро
enqueue_processing(event)
return {'received': True}Протокол однонаправленного стриминга от сервера к клиенту поверх обычного HTTP. Сервер отвечает `Content-Type: text/event-stream` и держит соединение открытым, периодически отправляя events в формате `data: <message>\n\n` (опционально с `event:`, `id:`, `retry:`). Клиент в браузере -- `new EventSource(url)`, в Python -- `httpx-sse` или ручной `iter_lines()`. По сравнению с WebSocket: проще (стандартный HTTP, проходит через прокси без специальной настройки), но только server -> client, не bi-directional. Идеален для: stock-tickers, прогресс ETL-задач, OpenAI streaming responses. Главный capkan -- проксирующий nginx буферизует stream; нужно `X-Accel-Buffering: no` и `proxy_buffering off`.
# Сервер (FastAPI):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_generator():
for i in range(10):
yield f'data: {{"value": {i}}}\n\n'
await asyncio.sleep(1)
@app.get('/events')
async def events():
return StreamingResponse(
event_generator(),
media_type='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'},
)
# Клиент (Python):
import httpx
with httpx.Client(timeout=None) as c:
with c.stream('GET', 'https://api.example.com/events') as r:
for line in r.iter_lines():
if line.startswith('data: '):
print(line[6:])Полнодуплексный протокол поверх TCP, начинающийся как HTTP-Upgrade (`Connection: Upgrade`, `Upgrade: websocket`) и переходящий в постоянное двунаправленное соединение. Подходит для real-time чатов, multiplayer-игр, live-trading, collaborative tools. Сообщения -- текст или binary frames. URI: `ws://` или `wss://` (TLS). Минусы по сравнению с SSE: сложнее для stateless балансировки (sticky session), сложнее проксировать через CDN (нужен WebSocket-aware), нет встроенного reconnect -- нужно вручную. В Python: `websockets` (async, наиболее популярный), `websocket-client` (sync). Для DE -- pulling realtime market data, Slack RTM API, обновления pipelines.
# Клиент (websockets, async)
import asyncio
import websockets
import json
async def listen():
async with websockets.connect('wss://stream.example.com/v1/feed') as ws:
await ws.send(json.dumps({'subscribe': 'BTC-USD'}))
async for raw in ws:
msg = json.loads(raw)
print(msg)
asyncio.run(listen())
# С reconnect-логикой:
async def listen_robust():
while True:
try:
async with websockets.connect(url, ping_interval=20) as ws:
async for raw in ws:
process(json.loads(raw))
except (websockets.ConnectionClosed, OSError) as e:
print(f'Disconnected: {e}, reconnecting in 5s')
await asyncio.sleep(5)Полу-real-time техника: клиент шлёт обычный HTTP-запрос, сервер НЕ отвечает сразу -- держит соединение открытым до появления данных или таймаута (обычно 30-60 сек), потом отвечает. Клиент сразу делает следующий запрос. Эмулирует push поверх pull. Использовался в чатах, Telegram Bot API getUpdates, Slack RTM до WebSocket. Недостатки: каждый запрос -- full HTTP overhead (headers, TLS handshake если short-lived), нагрузка на сервер (много открытых соединений), latency не такая низкая как у WS/SSE. Сейчас в новых системах вытеснён SSE и WebSocket. Но всё ещё актуален: где WebSocket не проходит через corporate proxy, fallback на long polling.
# Клиент:
import requests
def poll():
last_id = 0
while True:
try:
r = requests.get(
'https://api.example.com/v1/messages',
params={'since': last_id},
timeout=70, # больше серверного long-poll timeout
)
messages = r.json()
for m in messages:
process(m)
last_id = max(last_id, m['id'])
except requests.Timeout:
continue # сервер закрыл по таймауту, retry
except requests.ConnectionError:
time.sleep(5) # backoff
# Telegram Bot API getUpdates -- пример long polling:
# r = requests.get(f'https://api.telegram.org/bot{TOKEN}/getUpdates',
# params={'offset': last_update_id+1, 'timeout': 60})Криптографическая конструкция для проверки целостности и аутентичности сообщения через shared secret. Алгоритм: `HMAC(key, message) = HASH((key XOR opad) || HASH((key XOR ipad) || message))` где HASH -- обычно SHA256/SHA512. Используется для подписи webhook-payload (Stripe, GitHub), API-запросов (AWS Signature V4), JWT (alg=HS256). Особенность HMAC vs обычный hash: невозможна length-extension attack, ключ обязателен для проверки. ВАЖНО: при сравнении подписей всегда `hmac.compare_digest()` (constant-time), не `==` -- иначе timing attack. Получатель webhook должен: (1) посчитать HMAC от raw body с shared secret; (2) сравнить с подписью из header; (3) при несовпадении 401.
import hmac
import hashlib
import os
secret = os.environ['WEBHOOK_SECRET'].encode()
# Sender:
body = b'{"event":"payment.succeeded","id":"evt_123"}'
signature = hmac.new(secret, body, hashlib.sha256).hexdigest()
# POST /hook + header: X-Signature: <signature>
# Receiver:
def verify(received_signature: str, body: bytes) -> bool:
expected = hmac.new(secret, body, hashlib.sha256).hexdigest()
# ВАЖНО: constant-time compare
return hmac.compare_digest(expected, received_signature)
# Stripe-style (timestamp + body):
# Stripe-Signature: t=1715000000,v1=<hmac of "t.body">
# Защита от replay attack: проверяй timestamp в окне 5 мин
import time
t = int(time.time())
payload = f'{t}.{body.decode()}'.encode()
sig = hmac.new(secret, payload, hashlib.sha256).hexdigest()Header (Stripe, Square, GitHub), позволяющий клиенту сделать POST идемпотентным. Клиент генерирует UUID, передаёт в `Idempotency-Key: <uuid>`. Сервер хранит mapping `(key, request_hash) -> response` некоторое время (Stripe -- 24 часа). При повторном POST с тем же ключом сервер не выполняет действие повторно, возвращает закэшированный ответ. Защищает от: (1) network errors на стороне клиента -- можно безопасно retry; (2) случайных double-submit'ов. Если ключ тот же, но body другой -- сервер должен ответить 422/409 (idempotency conflict). Для DE -- ОБЯЗАТЕЛЬНО при ETL-write'ах в idempotent-aware API: сгенерировать стабильный key из (source_id, batch_id) -- повторный запуск pipeline не создаст дубликатов.
import requests
import uuid, hashlib
# Стабильный idempotency key из бизнес-данных:
def make_key(order_id, attempt_window_id):
raw = f'{order_id}:{attempt_window_id}'
return hashlib.sha256(raw.encode()).hexdigest()
# В ETL -- стабильно для retry'ев одного и того же batch:
key = make_key(order_id=12345, attempt_window_id='2026-05-15-batch-7')
for attempt in range(3):
try:
r = requests.post(
'https://api.stripe.com/v1/charges',
headers={'Idempotency-Key': key, 'Authorization': f'Bearer {SK}'},
data={'amount': 1000, 'currency': 'usd', 'source': 'tok_visa'},
timeout=10,
)
# Даже если предыдущая попытка успела создать charge -- вернётся ТОТ ЖЕ
r.raise_for_status()
break
except (requests.Timeout, requests.ConnectionError):
continue # safe retry -- duplicate не создастсяПодмена реальных HTTP-вызовов в тестах на предзаданные ответы -- без реального сетевого общения. Цель: тесты быстрые, детерминированные, не зависят от внешних сервисов и не тратят rate-limit. Главные библиотеки в Python: `responses` (для requests), `respx` (для httpx, async), `pytest-httpserver` (поднимает реальный mini-server для проверки точного wire-формата), `vcr.py` (записывает реальные ответы и проигрывает). Балансируйте mock vs integration test: чисто unit-тесты с mock проверяют только логику клиента, contract-тесты с реальным API ловят drift в спеке. Анти-паттерн -- мок весь HTTP-стек, тесты зелёные, прод сломан.
# pip install responses
import responses
import requests
@responses.activate
def test_get_user():
responses.add(
method=responses.GET,
url='https://api.example.com/v1/users/42',
json={'id': 42, 'name': 'Lev'},
status=200,
)
r = requests.get('https://api.example.com/v1/users/42')
assert r.json()['name'] == 'Lev'
# httpx -- respx:
import respx, httpx
@respx.mock
def test_with_httpx():
respx.get('https://api.example.com/v1/users/42').mock(
return_value=httpx.Response(200, json={'id': 42, 'name': 'Lev'})
)
with httpx.Client() as c:
r = c.get('https://api.example.com/v1/users/42')
assert r.json()['name'] == 'Lev'В pytest -- функция, помеченная `@pytest.fixture`, подготавливающая данные/состояние для теста и опционально cleanup'ящая после. Объявляется по имени параметра в тестовой функции -- pytest сам injectит. Scope: function (default, на каждый тест), class, module, session (один раз на весь run). Удобно для: подготовки данных, поднятия mock-server, открытия HTTP-клиента, генерации тестовых JSON-payload'ов. Конфигурации в `conftest.py` (общие для каталога). Полезные встроенные: `tmp_path` (временный каталог), `monkeypatch` (патчить env-vars), `caplog` (capture logs), `capsys` (capture stdout).
import pytest
import httpx
import respx
@pytest.fixture(scope='session')
def api_client():
with httpx.Client(
base_url='https://api.example.com/v1',
headers={'Authorization': 'Bearer test-token'},
timeout=10.0,
) as c:
yield c
@pytest.fixture
def sample_user():
return {'id': 42, 'name': 'Lev', 'email': '[email protected]'}
@pytest.fixture
def mock_api():
with respx.mock(base_url='https://api.example.com') as m:
yield m
def test_get_user(api_client, sample_user, mock_api):
mock_api.get('/v1/users/42').mock(
return_value=httpx.Response(200, json=sample_user)
)
r = api_client.get('/users/42')
assert r.json() == sample_userТехника тестирования: первый запуск делает реальный HTTP-вызов и пишет request+response в файл-кассету (`.yaml` обычно); следующие запуски проигрывают кассету без сети. Изобретено в Ruby (`vcr`), для Python -- `vcr.py`, `pytest-recording`, `pytest-vcr`. Pro: тесты реалистичные (точные ответы реального API), быстрые (без сети после first run), детерминированные. Cons: кассеты протухают (API изменилось) -- нужен periodic re-record (`--record-mode=new_episodes`); кассеты с реальными токенами надо чистить (`filter_headers`); большие кассеты замусоривают репо. Хорошо подходит для интеграционных тестов «стабильных» API.
# pip install vcrpy pytest-recording
import vcr
import requests
my_vcr = vcr.VCR(
cassette_library_dir='tests/cassettes',
record_mode='once', # 'new_episodes' для re-record
filter_headers=['Authorization'], # вычистить токены
filter_query_parameters=['api_key'],
)
@my_vcr.use_cassette('test_github_user.yaml')
def test_github_user():
r = requests.get('https://api.github.com/users/torvalds')
assert r.status_code == 200
assert r.json()['login'] == 'torvalds'
# При первом запуске -- реальный запрос + запись
# Дальше -- проигрывание из cassettes/test_github_user.yaml
# pytest-recording (декоратор):
# @pytest.mark.vcr
# def test_x(): ...Файл (обычно YAML или JSON) с записанным взаимодействием HTTP -- request + response, сохранёнными VCR'ом. Структура: список interactions, у каждого request (method, url, headers, body) и response (status, headers, body). При проигрывании VCR ищет matching interaction и возвращает сохранённый ответ. Matching по умолчанию: method + URI; можно расширять до body или headers. Кассеты надо хранить в репо (часть тестов), но обязательно вычищать секреты (`filter_headers=['Authorization']`, `filter_post_data_parameters=['password']`). Регулярно перезаписывать (раз в квартал/при изменениях API), иначе тесты пройдут на устаревшем response, прод поломается.
# tests/cassettes/test_github_user.yaml (упрощённо):
interactions:
- request:
method: GET
uri: https://api.github.com/users/torvalds
headers:
Authorization:
- <REDACTED>
body: null
response:
status:
code: 200
message: OK
headers:
Content-Type:
- application/json
body:
string: '{"login": "torvalds", "id": 1024025, ...}'
version: 1
# CLI для перезаписи всех кассет:
# pytest --record-mode=rewrite
# pytest --record-mode=new_episodes # дозаписать новые
# pytest --record-mode=none # offline-onlyТестирование, проверяющее что provider (API) и consumer (клиент) договорились о форме запросов/ответов -- без реального E2E-тестирования. Главные подходы: (1) Schema-based -- клиент валидирует ответ против OpenAPI/JSON Schema (`jsonschema.validate(data, schema)`); (2) Pact-style -- consumer пишет ожидания в виде pact-файла, provider проверяет, что его API соответствует pact'ам всех known consumers. Pact-broker хранит pact'ы и оркестрирует verification. Pro: ловит breaking changes до production, не требует поднимать всю систему. Cons: pact-инфраструктура сложна, контракты быстро становятся outdated. Для большинства Python-команд достаточно schema-валидации.
# Простейшее contract-тестирование через jsonschema:
import jsonschema
import requests
USER_SCHEMA = {
'type': 'object',
'required': ['id', 'name', 'email'],
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string', 'minLength': 1},
'email': {'type': 'string', 'format': 'email'},
'role': {'type': 'string', 'enum': ['admin', 'user']},
},
}
def test_user_api_contract():
r = requests.get('https://api.example.com/v1/users/42')
r.raise_for_status()
jsonschema.validate(r.json(), USER_SCHEMA)
# Если API изменилось -- тест упадёт с детальной ошибкой
# Или валидация всего OpenAPI:
# pip install schemathesis
# schemathesis run https://api.example.com/openapi.json --checks=all