
• Оглавление
Сеть ненадёжна: клиенты делают повторные запросы после таймаута, прокси могут отправить дубль, мобильное приложение — повторить операцию при переключении сети. Если ваш эндпоинт создаёт что-то «уникальное» — заказ, платёж, списание бонусов — без идемпотентности вы рано или поздно получите:
Идемпотентность гарантирует: если пришло два (или двадцать) одинаковых запроса с одним и тем же смыслом, система выполнит операцию ровно один раз и вернёт один и тот же ответ.
GET по контракту и так идемпотентен и безопасен; DELETE — спорно, зависит от модели (чаще нужен ключ).
Базовая идея простая:
Рекомендации по контракту:
Пример запросов от клиента:
# Первая попытка
curl -X POST https://api.example.com/orders \
-H "Content-Type: application/json" \
-H "Idempotency-Key: 2c9a1e9b-4b9e-4d6e-9e67-6e3b2e7c1e32" \
-d '{"items":[{"sku":"SKU-1","qty":2}],"amount":1200,"currency":"RUB"}'
# Повтор той же операции (после таймаута)
curl -X POST https://api.example.com/orders \
-H "Content-Type: application/json" \
-H "Idempotency-Key: 2c9a1e9b-4b9e-4d6e-9e67-6e3b2e7c1e32" \
-d '{"items":[{"sku":"SKU-1","qty":2}],"amount":1200,"currency":"RUB"}'
Мы будем хранить ключи в таблице с уникальным ограничением по (account_id, idempotency_key). В записи держим хеш запроса, статус, итоговый ответ и время истечения.
-- Таблица идемпотентности
CREATE TABLE IF NOT EXISTS idempotency_keys (
account_id TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
request_hash TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('processing','completed','failed')),
response_status INT,
response_headers JSONB,
response_body JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (account_id, idempotency_key)
);
-- Индекс для быстрой очистки по TTL
CREATE INDEX IF NOT EXISTS idempotency_keys_expires_at_idx
ON idempotency_keys (expires_at);
Алгоритм обработки запроса:
Важно: вся логика — внутри транзакции. Уникальный ключ гарантирует отсутствие гонок на вставке.
Ниже минимальный рабочий пример: один эндпоинт POST /orders, асинхронный драйвер asyncpg, простая «бизнес-логика» создания заказа. Для краткости аккаунт берём из заголовка X-Account-Id.
# requirements:
# fastapi==0.110.0
# uvicorn[standard]==0.27.1
# asyncpg==0.29.0
import asyncio
import json
import os
import signal
import hashlib
from datetime import datetime, timedelta, timezone
import asyncpg
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/postgres")
IDEMPOTENCY_TTL_HOURS = int(os.getenv("IDEMPOTENCY_TTL_HOURS", "48"))
app = FastAPI(title="Idempotent Orders API")
pool: asyncpg.Pool | None = None
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS idempotency_keys (
account_id TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
request_hash TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('processing','completed','failed')),
response_status INT,
response_headers JSONB,
response_body JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (account_id, idempotency_key)
);
CREATE INDEX IF NOT EXISTS idempotency_keys_expires_at_idx
ON idempotency_keys (expires_at);
"""
async def init_db():
global pool
pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=10)
async with pool.acquire() as conn:
await conn.execute(CREATE_TABLE_SQL)
@app.on_event("startup")
async def on_startup():
await init_db()
@app.on_event("shutdown")
async def on_shutdown():
if pool:
await pool.close()
# Утилита: канонический хеш запроса (метод + путь + тело JSON с отсортированными ключами)
async def request_hash(req: Request, body: dict) -> str:
payload = {
"method": req.method.upper(),
"path": req.url.path,
"body": body,
}
# Стабильная сериализация
data = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8")
return hashlib.sha256(data).hexdigest()
# Пример бизнес-операции: создаём заказ (на самом деле просто генерим id)
async def create_order(account_id: str, body: dict) -> dict:
# Тут могла бы быть работа с вашей БД/платёжкой/складом
order_id = hashlib.sha1(json.dumps(body, sort_keys=True).encode()).hexdigest()[:16]
return {
"order_id": order_id,
"account_id": account_id,
"amount": body.get("amount"),
"currency": body.get("currency", "RUB"),
"items": body.get("items", []),
"status": "created",
}
@app.post("/orders")
async def create_order_endpoint(request: Request):
if pool is None:
raise HTTPException(500, detail="DB pool not initialized")
account_id = request.headers.get("X-Account-Id")
if not account_id:
raise HTTPException(400, detail="Missing X-Account-Id header")
idemp_key = request.headers.get("Idempotency-Key")
if not idemp_key:
raise HTTPException(400, detail="Missing Idempotency-Key header for POST /orders")
try:
body = await request.json()
if not isinstance(body, dict):
raise ValueError
except Exception:
raise HTTPException(400, detail="Invalid JSON body")
rhash = await request_hash(request, body)
expires_at = datetime.now(timezone.utc) + timedelta(hours=IDEMPOTENCY_TTL_HOURS)
async with pool.acquire() as conn:
async with conn.transaction():
# 1) Пытаемся вставить ключ в статусе processing
inserted = await conn.execute(
"""
INSERT INTO idempotency_keys(account_id, idempotency_key, request_hash, status, expires_at)
VALUES ($1, $2, $3, 'processing', $4)
ON CONFLICT DO NOTHING
""",
account_id, idemp_key, rhash, expires_at
)
if inserted and inserted.startswith("INSERT"):
# Мы владельцы ключа — выполняем операцию
try:
result = await create_order(account_id, body)
response_body = result
response_status = 201
response_headers = {"Content-Type": "application/json"}
await conn.execute(
"""
UPDATE idempotency_keys
SET status='completed', response_status=$5, response_headers=$6, response_body=$7
WHERE account_id=$1 AND idempotency_key=$2 AND request_hash=$3
""",
account_id, idemp_key, rhash, response_status, json.dumps(response_headers), json.dumps(response_body)
)
except Exception as e:
# Помечаем как failed, но сохраняем хеш для конфликтов
await conn.execute(
"""
UPDATE idempotency_keys
SET status='failed'
WHERE account_id=$1 AND idempotency_key=$2 AND request_hash=$3
""",
account_id, idemp_key, rhash
)
raise
else:
# Ключ уже существует — проверяем состояние
row = await conn.fetchrow(
"""
SELECT request_hash, status, response_status, response_headers, response_body
FROM idempotency_keys
WHERE account_id=$1 AND idempotency_key=$2
""",
account_id, idemp_key
)
if not row:
# Маловероятно из-за PRIMARY KEY, но проверим
raise HTTPException(500, detail="Idempotency state lost")
if row["request_hash"] != rhash:
raise HTTPException(409, detail="Idempotency-Key already used with different request")
if row["status"] == "completed":
# Отдаём кэшированный ответ
headers = row["response_headers"] or {}
resp = JSONResponse(content=row["response_body"], status_code=row["response_status"] or 200)
for k, v in headers.items():
resp.headers[k] = v
resp.headers["Idempotency-Key"] = idemp_key
return resp
if row["status"] == "processing":
# Небольшое ожидание результата (лонг-поллинг ~1 сек)
for _ in range(10):
await asyncio.sleep(0.1)
row2 = await conn.fetchrow(
"""
SELECT status, response_status, response_headers, response_body
FROM idempotency_keys
WHERE account_id=$1 AND idempotency_key=$2
""",
account_id, idemp_key
)
if row2 and row2["status"] == "completed":
headers = row2["response_headers"] or {}
resp = JSONResponse(content=row2["response_body"], status_code=row2["response_status"] or 200)
for k, v in headers.items():
resp.headers[k] = v
resp.headers["Idempotency-Key"] = idemp_key
return resp
# Если не дождались — просим клиента повторить
resp = JSONResponse({"detail": "Processing"}, status_code=202)
resp.headers["Retry-After"] = "2"
resp.headers["Idempotency-Key"] = idemp_key
return resp
# Если мы владельцы ключа и дошли сюда — отдаём свежесформированный ответ
response = JSONResponse(content=response_body, status_code=response_status)
response.headers.update({"Idempotency-Key": idemp_key})
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 8000)))
Пояснения:
Очистка по TTL (cron/джоб):
DELETE FROM idempotency_keys WHERE expires_at < now();
Если нужен сверхбыстрый доступ и хватает краткоживущих ключей (минуты-часы), можно использовать Redis с атомарной командой SET NX PX — она вставит ключ, если его ещё нет, и задаст TTL:
# Пример команды: установить ключ, если не существует, с TTL=2 дня
SET idem:{account_id}:{key} {request_hash} NX PX 172800000
Плюсы: скорость, простота. Минусы: память, отсутствие «вечного» хранилища ответа. Часто комбинируют: Redis для быстрой блокировки на время обработки, PostgreSQL — для долговременного хранения результата.
Если операция может идти минуты (например, платёж у провайдера):
Алерты:
Итог для бизнеса: дубликаты заказов и двойные списания исчезают, саппорт получает меньше обращений, финансы — меньше возвратов, а платёжная конверсия растёт за счёт безопасных ретраев клиента. И всё это — с парой таблиц, небольшим куском кода и понятным контрактом в API.