
Опирайтесь на данные, а не «чуйку».
Пример грубой оценки: бюджет 400 мс, две попытки. Первая попытка: connect 80 мс, read 180 мс. Если не вышло — пауза 60 мс, вторая попытка: connect 60 мс, read 120 мс. Всё вместе ~440 мс — уже на грани. Значит либо один ретрай, либо уменьшать таймауты. В реальности учитывайте очереди и нагрузку.
Ниже — минимально самодостаточный пример. Он показывает:
Установка зависимостей:
pip install fastapi uvicorn httpx tenacity prometheus-client pydantic
Код приложения:
# app.py
import asyncio
import time
from typing import Optional
import httpx
from fastapi import FastAPI, HTTPException, Response
from prometheus_client import Counter, Histogram, generate_latest
from pydantic import BaseSettings, PositiveInt
from tenacity import RetryError, AsyncRetrying, retry_if_exception_type, stop_after_attempt, stop_after_delay, wait_random_exponential
# ===== Метрики =====
EXTERNAL_LATENCY = Histogram(
"external_call_latency_seconds",
"Время внешнего вызова",
labelnames=("endpoint", "outcome"),
buckets=(0.05, 0.1, 0.2, 0.3, 0.5, 0.75, 1.0, 2.0, 5.0),
)
EXTERNAL_RETRIES = Counter(
"external_call_retries_total", "Количество ретраев внешних вызовов", labelnames=("endpoint", "reason")
)
EXTERNAL_FAILURES = Counter(
"external_call_failures_total", "Провалы внешних вызовов (после всех ретраев)", labelnames=("endpoint", "reason")
)
CIRCUIT_EVENTS = Counter(
"circuit_events_total", "События в circuit breaker", labelnames=("name", "event")
)
# ===== Настройки =====
class Settings(BaseSettings):
external_base_url: str = "https://example-partner.api"
connect_timeout_ms: PositiveInt = 120
read_timeout_ms: PositiveInt = 220
write_timeout_ms: PositiveInt = 120
total_timeout_ms: PositiveInt = 350
# Ретраи
max_attempts: PositiveInt = 2
max_retry_window_ms: PositiveInt = 250 # суммарная «окно» для ретраев
# Circuit Breaker
cb_fail_max: PositiveInt = 10 # после скольких последовательных провалов открыть
cb_reset_timeout_s: PositiveInt = 30 # через сколько секунд проверяем «поправился ли партнёр»
# Параллелизм к партнёру
partner_concurrency: PositiveInt = 8
settings = Settings()
# ===== Исключения и классификация ошибок =====
class TransientHTTPError(Exception):
"""Временная ошибка: таймауты, 5xx, сетевые сбои — можно ретраить."""
class PermanentHTTPError(Exception):
"""Постоянная ошибка: 4xx, бизнес-ошибка — ретраить нельзя."""
class CircuitOpenError(Exception):
"""Предохранитель открыт — быстро фэйлимся, чтобы не усугублять ситуацию."""
# ===== Простой асинхронный Circuit Breaker =====
class SimpleCircuitBreaker:
def __init__(self, name: str, fail_max: int, reset_timeout: float) -> None:
self.name = name
self.fail_max = fail_max
self.reset_timeout = reset_timeout
self._state = "closed" # closed | open | half_open
self._fail_count = 0
self._opened_at: Optional[float] = None
self._trial_in_progress = False
self._lock = asyncio.Lock()
@property
def state(self) -> str:
return self._state
async def call(self, func, *args, **kwargs):
async with self._lock:
now = time.monotonic()
if self._state == "open":
# ещё рано пробовать
if self._opened_at is not None and (now - self._opened_at) < self.reset_timeout:
CIRCUIT_EVENTS.labels(self.name, "reject_open").inc()
raise CircuitOpenError(f"circuit {self.name} is open")
# пора в half-open
if not self._trial_in_progress:
self._state = "half_open"
self._trial_in_progress = True
CIRCUIT_EVENTS.labels(self.name, "half_open").inc()
else:
CIRCUIT_EVENTS.labels(self.name, "reject_half_open").inc()
raise CircuitOpenError(f"circuit {self.name} is half-open (trial busy)")
elif self._state == "half_open":
if self._trial_in_progress:
# допустим только один пробный вызов
pass
else:
# ещё один параллельный зашёл — отклоняем
CIRCUIT_EVENTS.labels(self.name, "reject_half_open").inc()
raise CircuitOpenError(f"circuit {self.name} is half-open (trial busy)")
# если closed — просто идём дальше
# Важно: исполняем функцию вне глобального лока, чтобы не блокировать других
try:
result = await func(*args, **kwargs)
except Exception:
await self._on_failure()
raise
else:
await self._on_success()
return result
async def _on_failure(self):
async with self._lock:
if self._state == "half_open":
# не прошли проверку — снова open
self._open()
return
self._fail_count += 1
if self._fail_count >= self.fail_max:
self._open()
async def _on_success(self):
async with self._lock:
self._fail_count = 0
if self._state in ("open", "half_open"):
self._state = "closed"
self._trial_in_progress = False
self._opened_at = None
CIRCUIT_EVENTS.labels(self.name, "close").inc()
def _open(self):
self._state = "open"
self._fail_count = 0
self._opened_at = time.monotonic()
self._trial_in_progress = False
CIRCUIT_EVENTS.labels(self.name, "open").inc()
# ===== Клиент для партнёра =====
class PartnerClient:
def __init__(self) -> None:
self._client = httpx.AsyncClient(
base_url=settings.external_base_url,
timeout=httpx.Timeout(
connect=settings.connect_timeout_ms / 1000.0,
read=settings.read_timeout_ms / 1000.0,
write=settings.write_timeout_ms / 1000.0,
pool=settings.total_timeout_ms / 1000.0,
),
http2=True,
)
self._cb = SimpleCircuitBreaker(
name="partner_api", fail_max=settings.cb_fail_max, reset_timeout=settings.cb_reset_timeout_s
)
self._sem = asyncio.Semaphore(settings.partner_concurrency)
async def close(self):
await self._client.aclose()
async def get_user(self, user_id: str) -> dict:
endpoint = "/users/" + user_id
start = time.monotonic()
try:
async with self._sem:
# завернём все ретраи в вызов предохранителя: если партнёр стабильно плох, мы быстро фэйл‑фаст
result = await self._cb.call(self._with_retries, endpoint)
EXTERNAL_LATENCY.labels("get_user", "success").observe(time.monotonic() - start)
return result
except CircuitOpenError:
EXTERNAL_LATENCY.labels("get_user", "circuit_open").observe(time.monotonic() - start)
EXTERNAL_FAILURES.labels("get_user", "circuit_open").inc()
# здесь можно отдать кеш/запасной источник
raise
except PermanentHTTPError as e:
EXTERNAL_LATENCY.labels("get_user", "permanent_error").observe(time.monotonic() - start)
EXTERNAL_FAILURES.labels("get_user", "permanent_error").inc()
raise e
except Exception as e:
EXTERNAL_LATENCY.labels("get_user", "failed").observe(time.monotonic() - start)
EXTERNAL_FAILURES.labels("get_user", "failed").inc()
raise e
async def _with_retries(self, endpoint: str) -> dict:
# Общее окно ретраев
stop = stop_after_attempt(settings.max_attempts) | stop_after_delay(settings.max_retry_window_ms / 1000.0)
async for attempt in AsyncRetrying(
reraise=True,
stop=stop,
wait=wait_random_exponential(multiplier=0.05, max=0.5),
retry=retry_if_exception_type(TransientHTTPError),
before_sleep=self._before_sleep,
):
with attempt:
return await self._single_call(endpoint)
async def _before_sleep(self, retry_state):
EXTERNAL_RETRIES.labels("get_user", "transient_error").inc()
async def _single_call(self, endpoint: str) -> dict:
# Здесь никаких бесконечных ожиданий: таймауты уже на клиенте httpx
resp = await self._client.get(endpoint)
# уважение к 429: один мягкий ретрай с учётом Retry-After (если он небольшой)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
delay = float(retry_after)
except ValueError:
delay = 0.0
else:
delay = 0.0
# если партнёр просит подождать, и это вписывается в окно — сделаем мягкую паузу и попробуем ещё раз
if delay > 0.0 and delay <= (settings.max_retry_window_ms / 1000.0):
await asyncio.sleep(delay)
raise TransientHTTPError("429 with Retry-After, soft retry")
raise PermanentHTTPError("Too Many Requests (429)")
# 5xx — временные, ретраим
if 500 <= resp.status_code <= 599:
raise TransientHTTPError(f"Server error {resp.status_code}")
# 4xx — не ретраим (бизнес-/клиентская ошибка)
if 400 <= resp.status_code <= 499:
raise PermanentHTTPError(f"Client error {resp.status_code}")
# ОК
return resp.json()
partner_client = PartnerClient()
app = FastAPI()
@app.get("/user/{user_id}")
async def get_user(user_id: str):
try:
data = await partner_client.get_user(user_id)
return {"ok": True, "data": data}
except CircuitOpenError:
# деградация: отдаём "вежливую ошибку" без долгих ожиданий
raise HTTPException(status_code=503, detail="Партнёр временно недоступен, попробуйте позже")
except PermanentHTTPError as e:
raise HTTPException(status_code=400, detail=str(e))
except (httpx.TimeoutException, RetryError):
raise HTTPException(status_code=504, detail="Таймаут при обращении к партнёру")
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain; version=0.0.4; charset=utf-8")
@app.on_event("shutdown")
async def shutdown_event():
await partner_client.close()
Как проверить под нагрузкой:
uvicorn app:app --reload
# в другом терминале
# сымитируйте медленные ответы партнёра через прокси или mock, затем возьмите wrk/hey
hey -z 30s -q 10 http://localhost:8000/user/123
Что важно в примере:
Хеджирование — это старт второй параллельной попытки, если первая «подозрительно» долго тянется (например, дольше p95). Подходит для чтения и идемпотентных операций и только при строгих лимитах параллелизма. Иначе можете удвоить нагрузку на партнёра и счета за его API.
Когда применять:
Когда не применять:
Синхронный вызов подходит, когда:
Переходите на очередь/вебхуки, когда:
Комбинированная схема: быстрый синхронный ответ с номером задачи + асинхронное выполнение с уведомлением результата.
Включите таймауты и на сетевой инфраструктуре, чтобы «грязные» соединения не висели вечно.
Пример Nginx (upstream к партнёру через прокси):
upstream partner_upstream {
server partner.example.com:443;
keepalive 64;
}
server {
listen 443 ssl;
location /partner/ {
proxy_pass https://partner_upstream;
proxy_connect_timeout 0.2s; # 200 мс на соединение
proxy_read_timeout 0.8s; # 800 мс на чтение
proxy_send_timeout 0.5s; # 500 мс на отправку
proxy_next_upstream error timeout http_502 http_503 http_504;
proxy_next_upstream_tries 1; # не перемножаем ретраи
}
}
Kubernetes-пробы:
Минимальные алёрты:
Чёткие таймауты, умеренные ретраи с джиттером, Circuit Breaker и ограничение параллелизма — базовый «набор выживания» для интеграций. Он снимает хвост задержек, уменьшает простои, защищает от штормов ретраев и держит расходы на предсказуемом уровне. Начните с измерений (метрики), задайте бюджет времени для критического пути, поставьте предохранитель и уважайте лимиты партнёров. В большинстве случаев это позволит стабилизировать SLA без покупки дополнительных серверов и дорогих «костылей».