
• Оглавление
Идемпотентность — это свойство операции давать один и тот же итоговый эффект, даже если её выполнить несколько раз. Для бизнеса это про деньги, доверие и скорость разработки:
Повторы неизбежны по самой природе распределённых систем. Значит, стратегия — сделать операции устойчивыми к повторам.
Базовый приём для небезопасных по природе методов (POST, PATCH):
-- Храним статус обработки и ответ, чтобы вернуть его при повторе
create table if not exists idempotency (
fingerprint text primary key,
status text not null check (status in ('IN_PROGRESS','COMPLETED','FAILED')),
response_status int,
response_body jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idempotency_created_at_idx on idempotency (created_at);
-- Обновляем updated_at
create or replace function set_updated_at()
returns trigger language plpgsql as $$
begin
new.updated_at = now();
return new;
end;$$;
drop trigger if exists idempotency_set_updated_at on idempotency;
create trigger idempotency_set_updated_at
before update on idempotency
for each row execute procedure set_updated_at();
-- Пример бизнес‑таблицы для платежей: уникальность по ключу
create table if not exists payments (
id uuid primary key,
idem_key text not null unique,
amount_cents int not null check (amount_cents > 0),
currency text not null,
status text not null check (status in ('NEW','CAPTURED','FAILED')),
created_at timestamptz not null default now()
);
Ниже — минимальный пример. Он:
# requirements:
# flask==3.0.2
# psycopg[binary]==3.1.18
# psycopg_pool==3.2.2
# run:
# export DATABASE_URL=postgresql://user:pass@localhost:5432/app
# flask --app app run -h 0.0.0.0 -p 8000
import hashlib
import json
import os
import uuid
from datetime import timedelta, datetime, timezone
from flask import Flask, request, jsonify, make_response
from psycopg_pool import ConnectionPool
app = Flask(__name__)
DB_URL = os.environ.get("DATABASE_URL")
pool = ConnectionPool(DB_URL, min_size=1, max_size=10, open=True)
IDEMP_INPROGRESS_TTL = timedelta(minutes=2) # окно ожидания для конкурирующих запросов
def make_fingerprint(method: str, path: str, idem_key: str, body_bytes: bytes | None) -> str:
# Не храним тело запроса, только хэш. Включаем метод и путь, чтобы один ключ не работал на другой эндпоинт.
body_hash = hashlib.sha256(body_bytes or b"").hexdigest()
base = f"{method.upper()}|{path}|{idem_key}|{body_hash}"
return hashlib.sha256(base.encode()).hexdigest()
@app.post("/payments")
def create_payment():
idem_key = request.headers.get("Idempotency-Key")
if not idem_key or len(idem_key) > 128:
return make_response({"error": "Idempotency-Key is required and must be <= 128 chars"}, 400)
fp = make_fingerprint(request.method, request.path, idem_key, request.get_data())
with pool.connection() as conn:
with conn.cursor() as cur:
conn.execute("begin")
# Пытаемся пометить попытку как IN_PROGRESS. Если ключ новый — возвращается строка.
cur.execute(
"""
insert into idempotency (fingerprint, status)
values (%s, 'IN_PROGRESS')
on conflict do nothing
returning fingerprint
""",
(fp,)
)
inserted = cur.fetchone()
if not inserted:
# Ключ уже был. Смотрим состояние, блокируем на чтение.
cur.execute(
"select status, response_status, response_body, updated_at from idempotency where fingerprint=%s for update",
(fp,)
)
row = cur.fetchone()
status, resp_code, resp_body, updated_at = row
if status == 'COMPLETED':
conn.execute("commit")
resp = make_response(jsonify(resp_body or {}), resp_code or 200)
resp.headers["X-Idempotent-Replay"] = "true"
return resp
# Если обработка в процессе слишком долго — позволим захватить попытку заново (редкий кейс)
if status == 'IN_PROGRESS' and updated_at < datetime.now(timezone.utc) - IDEMP_INPROGRESS_TTL:
cur.execute(
"update idempotency set status='IN_PROGRESS' where fingerprint=%s",
(fp,)
)
# продолжаем как обычную новую попытку ниже
else:
conn.execute("commit")
resp = make_response({"error": "Processing", "retry_in_seconds": 5}, 409)
resp.headers["Retry-After"] = "5"
return resp
# Здесь у нас уникальное право выполнить бизнес‑логику для этого отпечатка.
try:
amount_cents = int((request.json or {}).get("amount_cents", 0))
currency = (request.json or {}).get("currency", "USD")
if amount_cents <= 0:
raise ValueError("amount_cents must be > 0")
payment_id = str(uuid.uuid4())
# В реальности здесь вы бы сходили к провайдеру оплаты с тем же idem_key.
# Для примера просто создадим запись в БД с уникальным ключом.
cur.execute(
"""
insert into payments (id, idem_key, amount_cents, currency, status)
values (%s, %s, %s, %s, 'CAPTURED')
on conflict (idem_key) do update set status=excluded.status
returning id, status
""",
(payment_id, idem_key, amount_cents, currency)
)
row = cur.fetchone()
final_id, final_status = row
response_body = {
"payment_id": final_id,
"status": final_status,
"amount_cents": amount_cents,
"currency": currency,
}
cur.execute(
"update idempotency set status='COMPLETED', response_status=%s, response_body=%s where fingerprint=%s",
(201, json.dumps(response_body), fp)
)
conn.execute("commit")
resp = make_response(jsonify(response_body), 201)
resp.headers["X-Idempotent-Recorded"] = "true"
return resp
except Exception as e:
cur.execute(
"update idempotency set status='FAILED', response_status=%s, response_body=%s where fingerprint=%s",
(500, json.dumps({"error": str(e)}), fp)
)
conn.execute("commit")
return make_response({"error": "failed", "detail": str(e)}, 500)
@app.get("/health")
def health():
return {"ok": True}
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
Очистка по расписанию:
-- Удаляем записи старше 3 суток
delete from idempotency where created_at < now() - interval '3 days';
В брокерах «ровно один раз» — дорогая иллюзия. Реально и просто достигается «по крайней мере один раз» плюс идемпотентность потребителя.
Стратегии:
На практике комбинируют: идемпотентная бизнес‑операция + запись маркера «после», а критичные секции оборачивают транзакцией.
# Дедупликация сообщений по уникальному message_id
# Вариант «после обработки»: бизнес‑операция идемпотентна сама по себе.
from contextlib import contextmanager
from psycopg import connect
import os
DSN = os.environ.get("DATABASE_URL")
DDL = """
create table if not exists processed_messages (
message_id text primary key,
processed_at timestamptz not null default now()
);
create table if not exists stock (
sku text primary key,
qty int not null
);
"""
@contextmanager
def get_conn():
with connect(DSN) as conn:
yield conn
def ensure_schema():
with get_conn() as conn:
conn.execute(DDL)
conn.commit()
def process_message(message_id: str, sku: str, delta_qty: int):
"""
Идемпотентное изменение остатков:
- upsert по SKU (если нет — создаём; если есть — корректируем),
- затем фиксируем message_id. Если ретрай — просто увидим запись и пропустим.
"""
with get_conn() as conn, conn.cursor() as cur:
conn.execute("begin")
# Проверяем, не обрабатывали ли этот message_id
cur.execute("select 1 from processed_messages where message_id=%s", (message_id,))
if cur.fetchone():
conn.execute("commit")
return "duplicate"
# Идемпотентный апдейт остатков через upsert
cur.execute(
"""
insert into stock (sku, qty) values (%s, %s)
on conflict (sku) do update set qty = stock.qty + excluded.qty
returning qty
""",
(sku, delta_qty)
)
new_qty = cur.fetchone()[0]
# Фиксируем, что сообщение обработано
cur.execute(
"insert into processed_messages (message_id) values (%s) on conflict do nothing",
(message_id,)
)
conn.execute("commit")
return {"sku": sku, "qty": new_qty}
if __name__ == "__main__":
ensure_schema()
print(process_message("m-1", "ABC", +10)) # {sku: ABC, qty: 10}
print(process_message("m-1", "ABC", +10)) # duplicate
print(process_message("m-2", "ABC", -3)) # {sku: ABC, qty: 7}