Server-Sent Events: HTTP-стрим в одну сторону
Webhook-и хороши для редких событий, но требуют у получателя публичного URL. Polling прост, но шумен и медленен. WebSocket — мощный, но тяжёлый. Между ними живёт тихий герой — Server-Sent Events, спецификация HTML5, на которой работает большинство современных LLM-чатов, прогресс-баров и live-дашбордов.
SSE — это обычный HTTP-ответ, который сервер не закрывает, а постепенно дописывает в него события. Браузер (или Python-клиент) читает поток построчно и реагирует на каждое сообщение. Никаких upgrade-протоколов, никаких новых портов — обычный GET с длинным response body.
HTTP кэширование: Cache-Control, ETag и conditional requestsПоток на проводе
Запрос:
GET /events HTTP/1.1
Host: api.example.com
Accept: text/event-stream
Cache-Control: no-cache
Ответ:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
data: hello
event: progress
data: {"percent": 10}
event: progress
id: 42
data: {"percent": 50}
retry: 5000
event: done
data: ok
Формат специфицирован в HTML Living Standard. Базовые правила:
- Каждое сообщение — последовательность строк, заканчивается двумя
\n(пустая строка-разделитель). data:— полезная нагрузка. Несколько подряд идущихdata:склеиваются через\n.event:— имя события (по умолчаниюmessage). Клиент подписывается на конкретные имена.id:— идентификатор сообщения. Клиент сохраняет последний полученный id.retry:— рекомендация клиенту, через сколько миллисекунд переподключаться.- Строка, начинающаяся с
:— комментарий. Используется как keep-alive (: heartbeat\n\nкаждые 30 секунд).
Автоматический reconnect
Главная фишка SSE — встроенный reconnect. Если соединение оборвалось (Wi-Fi, серверный рестарт, прокси), браузер сам переподключится через указанное в retry: время и пришлёт заголовок Last-Event-ID: 42. Сервер должен прочитать его и отправить только сообщения с id больше этого. Так получается реализовать resumable stream без клиентской логики.
@router.get("/events")
async def events(request: Request):
last_id = request.headers.get("Last-Event-ID")
start_from = int(last_id) + 1 if last_id else 0
async def stream():
async for event in event_log.iter_from(start_from):
yield f"id: {event.id}\nevent: {event.kind}\ndata: {event.payload_json}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Если событий нет, нужно периодически слать
: heartbeat\n\n раз в 15-30 секунд, иначе прокси (Nginx, ALB) закроют идле-соединение.
Браузер: EventSource API
В браузере SSE доступен через нативный класс — никаких библиотек:
const source = new EventSource("/api/events");
source.onmessage = (event) => {
console.log("default message:", event.data);
};
source.addEventListener("progress", (event) => {
const data = JSON.parse(event.data);
updateProgressBar(data.percent);
});
source.addEventListener("done", () => {
source.close();
});
source.onerror = (err) => {
// браузер сам сделает reconnect, не нужно ничего вызывать
console.warn("connection lost, will retry");
};
EventSource автоматически:
- Шлёт
Last-Event-IDпри reconnect-е. - Использует значение
retry:или дефолтные 3 секунды. - Парсит формат и вызывает обработчики.
В мобильных приложениях (iOS/Android) есть свои клиентские библиотеки, по сути копирующие EventSource API.
Python-сервер на Starlette
Starlette (база FastAPI) умеет SSE через StreamingResponse или специальный EventSourceResponse из sse-starlette:
import asyncio
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/llm/stream")
async def llm_stream(request: Request, prompt: str):
async def event_generator():
async for token in llm.generate_streaming(prompt):
if await request.is_disconnected():
break
yield {
"event": "token",
"data": token,
}
yield {"event": "done", "data": ""}
return EventSourceResponse(event_generator())
EventSourceResponse сам форматирует словарь в правильный SSE-формат, добавляет heartbeat-ы и обрабатывает disconnect. Без него пришлось бы писать f"event: {name}\ndata: {payload}\n\n" руками.
Для совместимости с прокси полезно отключить буферизацию:
return EventSourceResponse(
event_generator(),
headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"},
)
Python-клиент на httpx
В Python-приложении (например, скрипт-агрегатор для DE-pipeline) подключиться к SSE-источнику можно через httpx.stream:
import httpx
def listen_to_events(url: str):
with httpx.stream("GET", url, timeout=None, headers={"Accept": "text/event-stream"}) as resp:
event_name = "message"
data_buffer: list[str] = []
for line in resp.iter_lines():
if line == "":
if data_buffer:
payload = "\n".join(data_buffer)
handle(event_name, payload)
event_name = "message"
data_buffer = []
continue
if line.startswith(":"):
continue # heartbeat / comment
if line.startswith("event:"):
event_name = line[6:].strip()
elif line.startswith("data:"):
data_buffer.append(line[5:].lstrip())
elif line.startswith("id:"):
save_last_id(line[3:].strip())
Парсер простой: на пустую строку — собрать накопленные data: и вызвать обработчик. Для production-кода берут готовые либы: httpx-sse, aiohttp-sse-client.
SSE против WebSocket
И SSE, и WebSocket дают server push, но архитектурно различаются:
| Параметр | SSE | WebSocket |
|---|---|---|
| Транспорт | HTTP/1.1 или HTTP/2 | Отдельный upgrade с HTTP на ws/wss |
| Направление | Только server -> client | Full duplex (обе стороны) |
| Формат | Текст (UTF-8) | Текст + бинарка (frame-ы) |
| Reconnect | Встроенный, автоматический | Нужно реализовать вручную |
| Auth | Cookies, headers — как у обычного HTTP | Headers только при handshake |
| Прокси-совместимость | Полная (это HTTP) | Иногда блокируются корпоративными прокси |
| Сложность сервера | Низкая (yield строки) | Средняя (event loop, frame handling) |
Правило большого пальца: если данные текут только сервер -> клиент (push-уведомления, прогресс операций, LLM-стриминг, аудит-лог в реальном времени) — берите SSE. Если нужен двусторонний интерактив (chat, collaborative editing, multiplayer game) — WebSocket.
SSE — главный транспорт LLM-чатов в 2026. ChatGPT, Claude, Anthropic API, OpenAI API — все стримят токены через text/event-stream. Один запрос -> длинный поток data: {...} сообщений -> data: [DONE] в конце. Это и есть тот самый эффект «текст печатается по слову».
Ограничения
- Только текст. Бинарные данные нужно base64-кодировать.
- HTTP/1.1 — лимит 6 параллельных соединений на домен в браузере. На HTTP/2 проблемы нет (мультиплексирование).
- Один поток = одно соединение. Для нескольких параллельных подписок — несколько EventSource (через HTTP/2 это нормально).
- Stateful со стороны сервера. Каждое соединение держит ресурсы: при 100 000 одновременных клиентов нужны event-loop серверы (Starlette, aiohttp) и продуманное резервирование.
- Не все прокси корректно стримят. Nginx требует
proxy_buffering off,proxy_cache off, AWS ALB поддерживает но требует достаточно длинных idle timeout-ов.
Где SSE применяется в DE
- Live-дашборды на ClickHouse, Redash, Grafana — стримят последние агрегаты.
- Прогресс выполнения долгих джобов (Spark application, dbt run, Airflow task).
- Таилинг логов через web UI (Loki, Grafana).
- LLM-pipeline-ы: стриминг частичных ответов от LLM-провайдера в downstream-сервис.
- Audit-event подписки в data catalog-ах.
Для junior DE базовое умение — написать FastAPI endpoint, который отдаёт прогресс длинной задачи через SSE, и Python-клиент, который этот прогресс читает.