
Идемпотентность — это свойство операции давать один и тот же результат при повторном выполнении. Для бизнеса это:
Источники повторов:
Последствия:
Простая и надёжная схема:
Важно:
Ниже — минимальный рабочий проект. Он поднимает PostgreSQL и Redis через Docker, запускает FastAPI‑приложение и демонстрирует идемпотентный платежный эндпоинт.
version: "3.9"
services:
db:
image: postgres:16
environment:
POSTGRES_DB: idemp_demo
POSTGRES_USER: demo
POSTGRES_PASSWORD: demo
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U demo -d idemp_demo"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
fastapi==0.115.5
uvicorn[standard]==0.32.0
SQLAlchemy==2.0.36
psycopg[binary]==3.2.3
pydantic==2.9.2
python-dotenv==1.0.1
redis==5.1.1
from fastapi import FastAPI, Header, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from sqlalchemy import (
create_engine, String, Integer, BigInteger, Text, text, event,
)
from sqlalchemy.orm import DeclarativeBase, mapped_column, Session
from sqlalchemy.exc import IntegrityError
from datetime import datetime
import time
import re
import hmac
import hashlib
import os
from redis import Redis
# Конфигурация окружения
DB_DSN = os.getenv("DB_DSN", "postgresql+psycopg://demo:demo@localhost:5432/idemp_demo")
REDIS_DSN = os.getenv("REDIS_DSN", "redis://localhost:6379/0")
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "dev_secret_change_me")
# Настройка БД
engine = create_engine(DB_DSN, pool_pre_ping=True, future=True)
class Base(DeclarativeBase):
pass
class Payment(Base):
__tablename__ = "payments"
id = mapped_column(BigInteger, primary_key=True)
idempotency_key = mapped_column(String(64), unique=True, nullable=False, index=True)
amount_cents = mapped_column(Integer, nullable=False)
currency = mapped_column(String(3), nullable=False)
customer_id = mapped_column(String(64), nullable=False)
status = mapped_column(String(16), nullable=False, default="processing")
external_charge_id = mapped_column(String(64), nullable=True)
error_message = mapped_column(Text, nullable=True)
created_at = mapped_column(String(32), nullable=False, default=lambda: datetime.utcnow().isoformat())
updated_at = mapped_column(String(32), nullable=False, default=lambda: datetime.utcnow().isoformat())
class WebhookEvent(Base):
__tablename__ = "webhook_events"
id = mapped_column(BigInteger, primary_key=True)
provider = mapped_column(String(32), nullable=False)
event_id = mapped_column(String(128), unique=True, nullable=False, index=True)
signature = mapped_column(String(128), nullable=False)
payload = mapped_column(Text, nullable=False)
processed_at = mapped_column(String(32), nullable=False, default=lambda: datetime.utcnow().isoformat())
# Автообновление updated_at
@event.listens_for(Session, "before_flush")
def receive_before_flush(session, flush_context, instances):
now = datetime.utcnow().isoformat()
for obj in session.new.union(session.dirty):
if hasattr(obj, "updated_at"):
setattr(obj, "updated_at", now)
# Инициализация схемы
Base.metadata.create_all(engine)
# Настройка Redis (опционально для краткоживущей дедупликации)
redis_client = Redis.from_url(REDIS_DSN, decode_responses=True)
app = FastAPI(title="Idempotency Demo")
# Модели запросов/ответов
class PaymentRequest(BaseModel):
amount_cents: int = Field(gt=0)
currency: str = Field(min_length=3, max_length=3)
customer_id: str = Field(min_length=1, max_length=64)
class PaymentResponse(BaseModel):
id: int
status: str
amount_cents: int
currency: str
customer_id: str
external_charge_id: str | None = None
error_message: str | None = None
IDEMPOTENCY_KEY_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
def validate_idempotency_key(key: str) -> None:
if not key or not IDEMPOTENCY_KEY_RE.match(key):
raise HTTPException(status_code=400, detail="Некорректный Idempotency-Key (разрешены буквы, цифры, _ и -; до 64 символов)")
def simulate_external_charge(amount_cents: int, currency: str, customer_id: str) -> tuple[str, str | None]:
"""
Имитация внешнего списания.
Возвращает (status, external_charge_id_or_error)
"""
time.sleep(1.0) # имитация сети
# Условная логика: для ровных сумм всё хорошо, для нечетных — ошибка
if amount_cents % 2 == 0:
charge_id = f"CHG_{int(time.time())}_{amount_cents}"
return "succeeded", charge_id
else:
return "failed", "Платёж отклонён провайдером"
@app.post("/payments", response_model=PaymentResponse)
def create_payment(payload: PaymentRequest, Idempotency_Key: str = Header(alias="Idempotency-Key")):
validate_idempotency_key(Idempotency_Key)
# Краткоживущая защита от шторма дублей (не обязательна, но полезна)
# Не заменяет БД, а лишь снижает нагрузку при всплесках повторов.
dedupe_key = f"idemp:payments:{Idempotency_Key}"
first = redis_client.set(dedupe_key, "1", nx=True, ex=15)
with Session(engine) as session:
# Пытаемся вставить запись со статусом processing
p = Payment(
idempotency_key=Idempotency_Key,
amount_cents=payload.amount_cents,
currency=payload.currency.upper(),
customer_id=payload.customer_id,
status="processing",
)
try:
session.add(p)
session.commit()
session.refresh(p)
inserted = True
except IntegrityError:
session.rollback()
inserted = False
if inserted:
# Только «победитель гонки» выполняет внешний эффект
status, info = simulate_external_charge(payload.amount_cents, payload.currency.upper(), payload.customer_id)
p.status = status
if status == "succeeded":
p.external_charge_id = info
p.error_message = None
else:
p.error_message = info
p.external_charge_id = None
session.add(p)
session.commit()
else:
# Ключ уже есть — возвращаем сохранённый результат
p = session.query(Payment).filter_by(idempotency_key=Idempotency_Key).one()
if p.status == "processing":
# Идёт обработка в другом воркере/процессе — информируем клиента
# Клиент может повторить запрос через 1–2 секунды
return JSONResponse(
status_code=202,
content={
"id": p.id,
"status": p.status,
"amount_cents": p.amount_cents,
"currency": p.currency,
"customer_id": p.customer_id,
"external_charge_id": p.external_charge_id,
"error_message": p.error_message,
},
)
return PaymentResponse(
id=p.id,
status=p.status,
amount_cents=p.amount_cents,
currency=p.currency,
customer_id=p.customer_id,
external_charge_id=p.external_charge_id,
error_message=p.error_message,
)
# Пример вебхука с дедупликацией по event_id + проверка подписи
class WebhookPayload(BaseModel):
event_id: str
type: str
data: dict
def verify_signature(raw_body: bytes, signature_header: str, secret: str) -> bool:
mac = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(mac, signature_header)
@app.post("/webhooks/provider")
def webhook_provider(payload: WebhookPayload, X_Signature: str = Header(alias="X-Signature", default="")):
# Проверяем подпись
body_bytes = payload.model_dump_json().encode()
if not verify_signature(body_bytes, X_Signature, WEBHOOK_SECRET):
raise HTTPException(status_code=401, detail="Подпись не прошла проверку")
# Дедупликация по event_id на уровне БД (уникальный индекс)
with Session(engine) as session:
evt = WebhookEvent(
provider="provider",
event_id=payload.event_id,
signature=X_Signature,
payload=payload.model_dump_json(),
)
try:
session.add(evt)
session.commit()
# Здесь — фактическая обработка события, безопасно: мы точно первые
# Например, подтверждение платежа или обновление статуса заказа
# ... ваша бизнес‑логика ...
except IntegrityError:
session.rollback()
# Событие уже обработано — возвращаем 200, чтобы провайдер перестал ретраить
return {"status": "duplicate", "processed": True}
return {"status": "ok", "processed": True}
# 1) Поднимите инфраструктуру
docker compose up -d
# 2) Установите зависимости и запустите приложение
python -m venv .venv
source .venv/bin/activate # Windows: .venv\\Scripts\\activate
pip install -r requirements.txt
uvicorn main:app --reload
# 3) Тест идемпотентного платежа
curl -X POST http://127.0.0.1:8000/payments \
-H 'Content-Type: application/json' \
-H 'Idempotency-Key: pay_12345' \
-d '{"amount_cents": 2000, "currency": "RUB", "customer_id": "cust_1"}'
# Повтор запроса с тем же ключом — вернёт тот же результат
curl -X POST http://127.0.0.1:8000/payments \
-H 'Content-Type: application/json' \
-H 'Idempotency-Key: pay_12345' \
-d '{"amount_cents": 2000, "currency": "RUB", "customer_id": "cust_1"}'
# 4) Тест вебхука (с правильной подписью)
BODY='{"event_id":"evt_1","type":"payment.succeeded","data":{"id":"p_1"}}'
SIG=$(python - <<'PY'
import hmac,hashlib,sys
secret=b"dev_secret_change_me"
body=b'{"event_id":"evt_1","type":"payment.succeeded","data":{"id":"p_1"}}'
print(hmac.new(secret, body, hashlib.sha256).hexdigest())
PY
)
curl -X POST http://127.0.0.1:8000/webhooks/provider \
-H "Content-Type: application/json" \
-H "X-Signature: $SIG" \
-d "$BODY"
# Повтор с тем же event_id — будет помечен как duplicate
curl -X POST http://127.0.0.1:8000/webhooks/provider \
-H "Content-Type: application/json" \
-H "X-Signature: $SIG" \
-d "$BODY"
SQL‑вариант создания таблиц (если не используете ORM):
CREATE TABLE IF NOT EXISTS payments (
id BIGSERIAL PRIMARY KEY,
idempotency_key TEXT NOT NULL UNIQUE,
amount_cents INTEGER NOT NULL CHECK (amount_cents > 0),
currency TEXT NOT NULL CHECK (char_length(currency) = 3),
customer_id TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('processing','succeeded','failed')),
external_charge_id TEXT,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS webhook_events (
id BIGSERIAL PRIMARY KEY,
provider TEXT NOT NULL,
event_id TEXT NOT NULL UNIQUE,
signature TEXT NOT NULL,
payload TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Redis полезен как «амортизатор» при шквале дублей:
Пример атомарной операции:
# Возвращает True, если мы первые; False если уже есть такой ключ
is_first = redis_client.set("idemp:payments:pay_123", "1", nx=True, ex=15)
Распространённые ошибки с Redis:
Собирайте:
Алерты:
Идемпотентность — это не «красивая теория», а практический способ уменьшить стоимость инцидентов, снять страх перед ретраями и повысить предсказуемость интеграций. Внедряется по шагам, отлично масштабируется и окупается уже в первый месяц за счёт снижения нагрузки на поддержку и возвратов средств.