
Повторы запросов происходят постоянно: сеть рвётся, прокси делает повтор, пользователь кликает кнопку дважды, воркер падает и поднимается. Если операция имеет побочный эффект (списание денег, создание заказа, отправка письма), повтор приводит к дублям и потерям: двойные транзакции, нестыковки в учёте, шквал обращений в поддержку, штрафы от партнёров.
Идемпотентность — это договоренность: один и тот же запрос, повторённый любое число раз в разумное окно времени, даёт один и тот же эффект и тот же ответ. Правильно внедрённая идемпотентность:
Ключ идемпотентности — это уникальный идентификатор запроса. Лучшие практики:
Рекомендуемое пространство имён ключей в хранилище: idem:v1:{method}:{path}:{subject_id}:{key}
Ниже — минимально необходимая реализация:
# language: python
import asyncio
import hashlib
import json
import os
import uuid
from datetime import timedelta
from typing import Optional
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from redis.asyncio import Redis
app = FastAPI()
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
redis = Redis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)
RESULT_TTL_SEC = 24 * 3600 # храним результат 24 часа
LOCK_TTL_MS = 30_000 # блокировка на 30 секунд
class PaymentCreate(BaseModel):
amount: int = Field(gt=0)
currency: str = Field(min_length=3, max_length=3)
customer_id: str
def body_sha256(data: dict) -> str:
dumped = json.dumps(data, sort_keys=True, separators=(",", ":")).encode()
return hashlib.sha256(dumped).hexdigest()
def compose_keys(user_id: str, method: str, path: str, idem_key: str):
base = f"idem:v1:{method}:{path}:{user_id}:{idem_key}"
return {
"result": f"{base}:result", # JSON: {status_code, body, body_hash}
"lock": f"{base}:lock", # value: lock_token
}
async def set_lock(lock_key: str, token: str, ttl_ms: int) -> bool:
# SET key value NX PX ttl — атомарная попытка захвата
return await redis.set(lock_key, token, nx=True, px=ttl_ms) is True
async def release_lock(lock_key: str, token: str):
# снимаем замок только если владелец совпадает
lua = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
await redis.eval(lua, 1, lock_key, token)
@app.post("/payments")
async def create_payment(req: Request, payload: PaymentCreate,
idempotency_key: Optional[str] = Header(None, convert_underscores=False),
x_user_id: Optional[str] = Header(None)):
if not x_user_id:
raise HTTPException(400, "Header X-User-ID обязателен")
if not idempotency_key or len(idempotency_key) < 8:
raise HTTPException(400, "Idempotency-Key обязателен и должен быть не короче 8 символов")
method = "POST"
path = "/payments"
keys = compose_keys(x_user_id, method, path, idempotency_key)
h = body_sha256(payload.dict())
# 1) Если результат уже есть — вернём его сразу
cached = await redis.get(keys["result"])
if cached:
record = json.loads(cached)
if record["body_hash"] != h:
raise HTTPException(409, "Idempotency-Key уже использован с другим телом запроса")
return JSONResponse(record["body"], status_code=record["status_code"])
# 2) Пробуем захватить блокировку
lock_token = str(uuid.uuid4())
locked = await set_lock(keys["lock"], lock_token, LOCK_TTL_MS)
if not locked:
# Кто-то уже исполняет этот запрос — подскажем повторить
return JSONResponse({"status": "processing"}, status_code=202, headers={"Retry-After": "2"})
try:
# 3) Выполняем критическую операцию
payment_id = str(uuid.uuid4())
# здесь должны быть реальные побочные эффекты: транзакция в БД/платеж и т.д.
await asyncio.sleep(0.3) # имитация внешнего вызова
response_body = {
"id": payment_id,
"amount": payload.amount,
"currency": payload.currency,
"customer_id": payload.customer_id,
"status": "confirmed"
}
status_code = 201
# 4) Публикуем сохранённый ответ
record = {"status_code": status_code, "body": response_body, "body_hash": h}
await redis.set(keys["result"], json.dumps(record), ex=RESULT_TTL_SEC)
return JSONResponse(response_body, status_code=status_code)
finally:
await release_lock(keys["lock"], lock_token)
Что важно в этом подходе:
Опционально можно хранить ещё метод, путь и идентификатор субъекта в значении, чтобы дополнительно валидировать контекст.
# language: bash
hey -n 2000 -c 200 -m POST \
-H 'Content-Type: application/json' \
-H 'X-User-ID: 42' \
-H 'Idempotency-Key: 6ffb5b42-6c1e-4c45-8b93-9d9b7b6b3f01' \
-d '{"amount":100,"currency":"USD","customer_id":"c1"}' \
http://localhost:8000/payments
В логе вы увидите одну реальную обработку и сотни быстрых возвратов сохранённого результата.
Если Redis недоступен или хочется меньше внешних зависимостей — используйте PostgreSQL с уникальным ключом и «ответами по повтору».
-- language: sql
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE IF NOT EXISTS idempotency (
user_id bigint NOT NULL,
method text NOT NULL,
path text NOT NULL,
key text NOT NULL,
body_sha256 bytea NOT NULL,
status_code integer,
response_json jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (user_id, method, path, key)
);
CREATE INDEX IF NOT EXISTS idempotency_created_at_idx ON idempotency (created_at);
# language: python
import json
import hashlib
from typing import Optional, Tuple
import psycopg
def sha256_bytes(d: dict) -> bytes:
dumped = json.dumps(d, sort_keys=True, separators=(",", ":")).encode()
return hashlib.sha256(dumped).digest()
def ensure_idempotent_payment(conn: psycopg.Connection, user_id: int, method: str, path: str, key: str, body: dict,
perform) -> Tuple[int, dict]:
"""
perform(conn, body) -> (status_code:int, response:dict)
Возвращает сохранённый/новый ответ, защищая от повторов.
"""
bh = sha256_bytes(body)
with conn.transaction():
cur = conn.cursor()
# Пытаемся застолбить ключ
cur.execute(
"""
INSERT INTO idempotency(user_id, method, path, key, body_sha256)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
RETURNING user_id
""",
(user_id, method, path, key, bh)
)
inserted = cur.fetchone() is not None
if not inserted:
# ключ уже есть — смотрим, есть ли сохранённый ответ
cur.execute(
"""
SELECT status_code, response_json, body_sha256
FROM idempotency WHERE user_id=%s AND method=%s AND path=%s AND key=%s
""",
(user_id, method, path, key)
)
row = cur.fetchone()
if row is None:
raise RuntimeError("должна существовать запись по уникальному ключу")
status_code, response_json, stored_bh = row
if stored_bh.tobytes() != bh:
# тот же ключ, другое тело — конфликт
return 409, {"error": "Idempotency-Key уже использован с другим телом"}
if response_json is not None:
return status_code, response_json
else:
# уже выполняется кем-то другим
return 202, {"status": "processing"}
# Мы первые — выполняем операцию
status_code, response = perform(conn, body)
cur.execute(
"""
UPDATE idempotency
SET status_code=%s, response_json=%s
WHERE user_id=%s AND method=%s AND path=%s AND key=%s
""",
(status_code, json.dumps(response), user_id, method, path, key)
)
return status_code, response
Подход прост: уникальный ключ предотвращает гонки, а сохранённый ответ позволяет безопасно повторять запросы. Для «висящих» записей без ответа (если процесс умер) можно добавить задачу‑прибиральщика, которая по TTL чистит старые незавершённые записи или помечает их как «прерванные».
Доставка сообщений «ровно один раз» — мифически дорога. На практике используется at‑least‑once: сообщение может прийти повторно. Значит, обработчик обязан быть идемпотентным.
Приём: генерируйте ключ идемпотентности для задачи (например, business_id + тип события) и проверяйте/ставьте его в Redis перед обработкой.
# language: python
import json
import os
from redis import Redis
r = Redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))
DEDUP_TTL_SEC = 3 * 24 * 3600
def handle_event(event: dict):
event_id = event["id"] # устойчивый идентификатор из источника
key = f"dedup:v1:event:{event_id}"
# SETNX + TTL в два шага, если Redis<7 (без EX в SETNX)
if r.set(key, "1", nx=True, ex=DEDUP_TTL_SEC):
# выполняем побочный эффект
process(event)
else:
# уже обрабатывали
return
def process(event: dict):
# здесь бизнес-логика (отправка письма, начисления и т.п.)
pass
Почти все провайдеры присылают устойчивый идентификатор события. Дедупликация выглядит так же: SET NX + TTL и только после успеха — обработка. Важно: проверяйте подпись/секрет вебхука до дедупликации, иначе злоумышленник может блокировать настоящие события подставными ID.
Эти коды легко ложатся на ретраи клиентов: при 202 они повторяют позже, при 409 просят пользователя сгенерировать новый ключ (или меняют данные).
Метрики:
Тесты:
Эксплуатация:
Идемпотентность — это не «доп. усложнение», а страховка от дорогих ошибок. После внедрения вы сможете уверенно включать ретраи, уплотнять трафик через прокси и очереди, не опасаясь дублей и двойных списаний. А команда поддержки вздохнёт свободнее: меньше тикетов — больше доверия клиентов.