
Поведение клиентов и сети непредсказуемо: у мобильного интернета плавают задержки, браузер может «повторить отправку», прокси отрежет ответ на таймауте, а SDK платежного шлюза автоматически делает ретраи. В результате:
Все эти проблемы происходят не из‑за «плохих пользователей», а из‑за отсутствия чёткого контракта идемпотентности между клиентом, API и хранилищем.
Идемпотентность — свойство операции: повторение одного и того же запроса с одними и теми же данными приводит к одному и тому же наблюдаемому результату. Примеры:
Где она критична:
Классический подход — ввести явный ключ идемпотентности, передаваемый с запросом. Например, заголовок Idempotency-Key. Важно не только «принять ключ», но и формализовать правила.
Что должно быть в контракте API:
Бонус: клиент может генерировать ключи заранее (UUID v7/ULID) и повторять запрос до получения стабильного ответа, не боясь двойного эффекта.
Минимальные требования к хранилищу идемпотентности:
Схема таблицы в Postgres:
create table if not exists idempotency_keys (
key text not null,
user_id bigint not null,
endpoint text not null,
request_hash bytea not null,
response_status int not null default 0, -- 0 = в процессе
response_body jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
ttl_seconds int not null default 86400,
primary key (key, user_id, endpoint)
);
create index if not exists idx_idem_created_at on idempotency_keys (created_at);
Ключевые приёмы:
Ниже — рабочий пример: POST /orders создаёт заказ. Сервер обеспечивает идемпотентность по заголовку Idempotency-Key, уникальным для пользователя и эндпоинта.
# requirements:
# fastapi==0.110.0
# uvicorn[standard]==0.27.1
# asyncpg==0.29.0
# pydantic==2.6.1
import hashlib
import json
import os
from typing import Optional
import asyncpg
from fastapi import FastAPI, Header, HTTPException, Request, Response, status
from pydantic import BaseModel, conint
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://postgres:postgres@localhost:5432/app")
app = FastAPI(title="Orders API with Idempotency")
pool: Optional[asyncpg.Pool] = None
class OrderIn(BaseModel):
customer_id: conint(gt=0)
items: list[dict]
currency: str
class OrderOut(BaseModel):
order_id: int
status: str
amount: int
currency: str
async def ensure_schema(conn: asyncpg.Connection):
await conn.execute(
"""
create table if not exists idempotency_keys (
key text not null,
user_id bigint not null,
endpoint text not null,
request_hash bytea not null,
response_status int not null default 0, -- 0 = processing
response_body jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
ttl_seconds int not null default 86400,
primary key (key, user_id, endpoint)
);
create table if not exists orders (
id serial primary key,
user_id bigint not null,
customer_id bigint not null,
amount int not null,
currency text not null,
status text not null,
created_at timestamptz not null default now()
);
"""
)
@app.on_event("startup")
async def on_startup():
global pool
pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5)
async with pool.acquire() as conn:
await ensure_schema(conn)
@app.on_event("shutdown")
async def on_shutdown():
if pool:
await pool.close()
def canonical_request_hash(method: str, path: str, body: bytes) -> bytes:
h = hashlib.sha256()
h.update(method.upper().encode())
h.update(b"\n")
h.update(path.encode())
h.update(b"\n")
# Тело нормализуем: JSON без пробелов, стабильный порядок ключей
try:
as_json = json.loads(body.decode() or "{}")
body_norm = json.dumps(as_json, separators=(",", ":"), sort_keys=True).encode()
except Exception:
body_norm = body
h.update(body_norm)
return h.digest()
@app.post("/orders", response_model=OrderOut)
async def create_order(
request: Request,
idempotency_key: str = Header(alias="Idempotency-Key"),
x_user_id: int = Header(alias="X-User-Id")
):
if not idempotency_key or len(idempotency_key) > 256:
raise HTTPException(status_code=400, detail="Отсутствует или слишком длинный Idempotency-Key")
raw_body = await request.body()
req_hash = canonical_request_hash(request.method, request.url.path, raw_body)
async with pool.acquire() as conn:
async with conn.transaction():
# Попытаемся вставить заглушку. Если получилось — мы «владельцы» выполнения.
inserted = await conn.execute(
"""
insert into idempotency_keys (key, user_id, endpoint, request_hash)
values ($1, $2, $3, $4)
on conflict do nothing
""",
idempotency_key, x_user_id, request.url.path, req_hash
)
if inserted == "INSERT 0 1":
# Впервые видим этот ключ: выполняем бизнес-логику
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Невалидный JSON")
order_in = OrderIn(**payload)
amount = sum(int(item.get("price", 0)) * int(item.get("qty", 1)) for item in order_in.items)
row = await conn.fetchrow(
"""
insert into orders (user_id, customer_id, amount, currency, status)
values ($1, $2, $3, $4, 'created')
returning id, status, amount, currency
""",
x_user_id, order_in.customer_id, amount, order_in.currency
)
response = {
"order_id": row[0],
"status": row[1],
"amount": row[2],
"currency": row[3],
}
# Сохраняем финальный ответ для будущих повторов
await conn.execute(
"""
update idempotency_keys
set response_status = $1, response_body = $2
where key = $3 and user_id = $4 and endpoint = $5
""",
201, json.dumps(response), idempotency_key, x_user_id, request.url.path
)
# Возвращаем обычный ответ первого выполнения
return Response(
content=json.dumps(response),
status_code=201,
media_type="application/json"
)
# Ключ уже существует — проверим хэш и вернём кэшированный ответ
existing = await conn.fetchrow(
"""
select request_hash, response_status, response_body
from idempotency_keys
where key = $1 and user_id = $2 and endpoint = $3
""",
idempotency_key, x_user_id, request.url.path
)
if not existing:
# Теоретически не должно случиться: гонка на разделах/узлах
raise HTTPException(status_code=409, detail="Идемпотентный ключ в конфликте, попробуйте позже")
if bytes(existing[0]) != req_hash:
raise HTTPException(status_code=422, detail="Idempotency-Key уже использован с другим запросом")
status_code = int(existing[1])
body = existing[2]
if status_code == 0:
# Операция ещё выполняется в другом воркере
raise HTTPException(status_code=409, detail="Операция в обработке, повторите позже")
return Response(
content=json.dumps(body),
status_code=status_code,
media_type="application/json"
)
Особенности:
Транспортные системы почти всегда дают доставку «минимум один раз». Мы достигаем «ровно один раз с точки зрения бизнеса» с помощью inbox/outbox.
Inbox (для входящих событий, вебхуков):
create table if not exists payment_events (
provider text not null,
event_id text not null,
received_at timestamptz not null default now(),
payload jsonb not null,
processed_at timestamptz,
primary key (provider, event_id)
);
Алгоритм обработчика вебхука:
Outbox (для исходящих событий): в той же транзакции, что и изменение данных, пишем запись в outbox, а отдельный воркер надёжно публикует событие и помечает «отправлено». Это устраняет «сделали в БД, не отправили в шину» и наоборот.
Иногда проще всего сделать операцию идемпотентной, выбрав правильный ключ:
Преимущества — меньше таблиц сопроводительных ключей и проще отладка. Недостаток — нужен продуманный формат ключа и проверка коллизий.
Пример SQL‑уборки:
delete from idempotency_keys
where created_at + make_interval(secs => ttl_seconds) < now()
limit 10000; -- батчево, чтобы не блокировать
Идемпотентность — это не про «красивый HTTP», а про деньги и доверие. Правильный дизайн ключей и обработчиков убирает дубли заказов и повторные списания, снижает количество споров и обращений в поддержку, делает интеграции с партнёрами стабильными, а отчёты — согласованными. Потратите 2–3 дня на архитектуру и внедрение — сэкономите месяцы на разборе инцидентов и репутацию у клиентов.