
Идемпотентность — это свойство операции, при котором повторный вызов с теми же параметрами даёт тот же результат и не создаёт побочных эффектов. Для бизнеса это означает:
В мире мобильных сетей и интеграций с внешними системами повторы неизбежны: клиенты повторяют запросы после таймаутов, браузер отправляет форму ещё раз, вебхуки приходят дважды. Идемпотентность превращает эти повторы в безопасную норму.
Для чтения (GET) ничего делать не надо: оно по определению не меняет состояние. Весь смысл — в небезопасных методах: POST, PATCH, PUT, DELETE.
Ключ идемпотентности — это идентификатор попытки выполнить «ту же» операцию:
Idempotency-Key (или в тело, если это RPC). Можно использовать UUID v4 или ULID.Минимум: Idempotency-Key, путь/операция, идентификатор субъекта (пользователь/организация), хэш существенных полей, срок годности.
Идея: при первом запросе «забронировать» ключ, выполнить действие, записать итог и вернуть тот же ответ для всех повторов. Для синхронизации между несколькими экземплярами приложения используем транзакцию в БД и advisory lock.
Плюсы:
Минусы:
Простой приём для операций «создать сущность с клиентским idempotency_key»:
idempotency_key с уникальным индексом в таблицу бизнес‑сущности (например, orders).Плюсы: минимум инфраструктуры. Минусы: не всегда удобно хранить «сырой» ответ. Для разнородных операций лучше иметь отдельную таблицу ключей.
Подходит для лёгких операций с коротким окном (минуты/часы):
SET key value NX EX ttl — резервируем ключ, параллельные запросы увидят, что ключ занят.Риски: потеря данных при перезапуске, вытеснение по памяти, отсутствие долговременного аудита. Для денег и критичных операций лучше использовать БД как «источник правды».
Ниже минимальный рабочий пример идемпотентного POST «создать платёж» с таблицей ключей, advisory lock и возвратом того же результата при повторах.
-- Таблица для хранения результатов по ключам идемпотентности
CREATE TABLE IF NOT EXISTS idempotency (
key_hash BIGINT NOT NULL, -- хэш ключа для advisory lock
scope TEXT NOT NULL, -- путь/операция
subject_id BIGINT NOT NULL, -- пользователь/организация
req_hash TEXT NOT NULL, -- хэш существенных полей запроса
status_code INT, -- сохранённый HTTP‑статус
response_body JSONB, -- сохранённый ответ
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (scope, subject_id, key_hash)
);
CREATE INDEX IF NOT EXISTS idempotency_expires_at_idx
ON idempotency (expires_at);
-- Пример таблицы платежей
CREATE TABLE IF NOT EXISTS payments (
id BIGSERIAL PRIMARY KEY,
subject_id BIGINT NOT NULL,
amount_cents BIGINT NOT NULL CHECK (amount_cents > 0),
currency TEXT NOT NULL,
status TEXT NOT NULL, -- created, captured, failed
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
# requirements: fastapi, uvicorn, asyncpg, pydantic, python-dotenv (по желанию)
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, conint, constr
import asyncpg
import hashlib
import os
import json
app = FastAPI()
DB_DSN = os.getenv("DB_DSN", "postgresql://postgres:postgres@localhost:5432/postgres")
pool: asyncpg.Pool
class CreatePayment(BaseModel):
amount_cents: conint(gt=0)
currency: constr(min_length=3, max_length=3)
class User(BaseModel):
id: int
async def get_user(request: Request) -> User:
# Для примера: берём пользователя из заголовка, в реальности — из токена
user_id = request.headers.get("X-User-Id")
if not user_id:
raise HTTPException(401, "Нет пользователя")
return User(id=int(user_id))
async def sha256_str(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def key_to_bigint(key: str) -> int:
# Превращаем произвольный ключ в 64-битное число для advisory lock
h = hashlib.sha256(key.encode("utf-8")).digest()
return int.from_bytes(h[:8], byteorder="big", signed=False)
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(DB_DSN, min_size=1, max_size=10)
@app.post("/payments")
async def create_payment(payload: CreatePayment, request: Request, idempotency_key: str | None = Header(default=None)):
if not idempotency_key:
raise HTTPException(400, "Требуется заголовок Idempotency-Key")
user = await get_user(request)
scope = "/payments:create"
key_hash = key_to_bigint(idempotency_key)
req_hash = await sha256_str(json.dumps(payload.dict(), sort_keys=True))
async with pool.acquire() as conn:
tr = conn.transaction()
await tr.start()
try:
# Берём advisory lock на время транзакции — сериализуем параллельные запросы
await conn.execute("SELECT pg_advisory_xact_lock($1)", key_hash)
# Проверяем существующий результат
row = await conn.fetchrow(
"""
SELECT status_code, response_body, req_hash, expires_at
FROM idempotency
WHERE scope=$1 AND subject_id=$2 AND key_hash=$3
""",
scope, user.id, key_hash,
)
if row:
if row["expires_at"] < asyncpg.pgproto.pgproto.TimestampFromTicks(request.state.asgi.scope.get("time", 0)):
# записи истекли — теоретически можно удалить, но для простоты игнорируем
pass
if row["req_hash"] != req_hash:
raise HTTPException(409, "Этот Idempotency-Key уже использован с другими данными")
# Возвращаем сохранённый ответ
return JSONResponse(status_code=row["status_code"], content=row["response_body"])
# Резервируем ключ (создаём пустую запись с TTL)
ttl_hours = 48
await conn.execute(
"""
INSERT INTO idempotency (key_hash, scope, subject_id, req_hash, expires_at)
VALUES ($1, $2, $3, $4, now() + ($5 || ' hours')::interval)
""",
key_hash, scope, user.id, req_hash, ttl_hours,
)
# Выполняем бизнес‑операцию: создаём платёж
payment = await conn.fetchrow(
"""
INSERT INTO payments(subject_id, amount_cents, currency, status)
VALUES ($1, $2, $3, 'created')
RETURNING id, subject_id, amount_cents, currency, status, created_at
""",
user.id, payload.amount_cents, payload.currency,
)
response_body = {
"payment_id": payment["id"],
"status": payment["status"],
"amount_cents": payment["amount_cents"],
"currency": payment["currency"],
"created_at": payment["created_at"].isoformat(),
}
status_code = 201
# Сохраняем итоговый ответ
await conn.execute(
"""
UPDATE idempotency
SET status_code=$1, response_body=$2
WHERE scope=$3 AND subject_id=$4 AND key_hash=$5
""",
status_code, json.dumps(response_body), scope, user.id, key_hash,
)
await tr.commit()
return JSONResponse(status_code=status_code, content=response_body)
except HTTPException:
await tr.rollback()
raise
except Exception as e:
await tr.rollback()
# Можно обновить запись как временную ошибку (без закрепления тела)
raise HTTPException(500, f"Внутренняя ошибка: {e}")
Что здесь важно:
pg_advisory_xact_lock сериализует конкурентные запросы с одним ключом на уровне всей кластера БД — без внешних распределённых замков.Фоновая очистка может выглядеть так:
DELETE FROM idempotency WHERE expires_at < now();
Запускать раз в час планировщиком БД или задачей в приложении.
Почти все очереди дают доставку «не реже одного раза»: сообщение может прийти повторно после падения воркера. Значит обработчик обязан быть идемпотентным.
Подходы:
message_id сначала «бронь»: INSERT INTO processed(message_id) VALUES ($1) ON CONFLICT DO NOTHING.CREATE TABLE IF NOT EXISTS processed_messages (
message_id TEXT PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- В воркере перед обработкой
INSERT INTO processed_messages(message_id) VALUES ($1) ON CONFLICT DO NOTHING;
-- Проверяем количество затронутых строк: если 0 — это повтор, пропускаем
-- Начисление по конкретному источнику (source_id) не должно быть дважды
CREATE TABLE IF NOT EXISTS bonuses (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
amount INT NOT NULL,
source_id TEXT NOT NULL,
UNIQUE (user_id, source_id)
);
-- Обработка события начисления
INSERT INTO bonuses(user_id, amount, source_id)
VALUES ($1, $2, $3)
ON CONFLICT (user_id, source_id) DO NOTHING;
FOR UPDATE SKIP LOCKED, чтобы один воркер не взял задачу, которую уже взял другой, а повторы обрабатывайте как в пункте 1 или через состояние задачи.-- Выборка следующей не взятой задачи
WITH cte AS (
SELECT id FROM tasks
WHERE status = 'pending'
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE tasks t
SET status = 'processing'
FROM cte
WHERE t.id = cte.id
RETURNING t.*;
С платежами идемпотентность обязательна.
Idempotency-Key на стороне клиента и прокидывайте его до платёжного провайдера. Многие провайдеры поддерживают такой заголовок или параметр, возвращая тот же результат для повторов.event.id как message_id и таблицу processed_events с уникальным ключом.message_id и таблицу обработанных сообщений.Повторы запросов — неизбежная реальность. Без идемпотентности она стоит дорого: двойные списания, дубли заказов, недовольные клиенты и часы ручной зачистки. Простая дисциплина — ключ идемпотентности, хэш полезной нагрузки, хранение итогового ответа и сериализация по ключу — делает повторы безопасными. В API это снижает число инцидентов и повышает конверсию на «шумной» сети. В очередях это превращает «доставку не реже одного раза» в предсказуемую обработку без дублей. Для бизнеса это означает меньше непредвиденных расходов и спокойные релизы.