Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Идемпотентность в API и очередях: без двойных списаний и повторной обработки — надёжная бизнес‑логика

Разработка и технологии16 апреля 2026 г.
Повторы запросов и сообщений неизбежны: таймауты, ретраи, перебои сети. Если бизнес‑операции не идемпотентны, вы рискуете двойными списаниями, рассинхронизацией складских остатков и испорченными метриками. В статье — практические паттерны для API и очередей, готовые схемы таблиц и рабочий код на Python + PostgreSQL, чтобы сделать операции безопасно повторяемыми.
Идемпотентность в API и очередях: без двойных списаний и повторной обработки — надёжная бизнес‑логика

• Оглавление

  • Зачем бизнесу идемпотентность
  • Откуда берутся дубли и повторы
  • HTTP: идемпотентный ключ и повторяемые ответы
    • Схема таблиц в PostgreSQL
    • Обработчик на Python/Flask с защитой от гонок
    • Срок жизни ключа, размер, отпечаток запроса
  • Очереди и фоновые задачи: как не обрабатывать сообщение дважды
    • Маркер обработки в БД: «впереди обработки» и «после обработки»
    • Пример кода: дедупликация сообщений в PostgreSQL
  • Деньги и инвентарь: особые правила
  • Наблюдаемость и тестирование
  • Риски и анти‑паттерны
  • Чек‑лист внедрения
  • Что ещё посмотреть

Зачем бизнесу идемпотентность

Идемпотентность — это свойство операции давать один и тот же итоговый эффект, даже если её выполнить несколько раз. Для бизнеса это про деньги, доверие и скорость разработки:

  • Без двойных списаний и дублирующих заказов — меньше чарджбеков, возвратов и ручной разборки.
  • Прозрачные SLA: можно без страха настраивать автоматические ретраи при сбоях сети, не опасаясь повторной обработки.
  • Снижение инцидентов и нагрузки на саппорт — меньше «призрачных» заказов и спорных ситуаций с партнёрами.
  • Ускорение интеграций: простые правила для клиентов и внутренних сервисов — присылай ключ идемпотентности, получай стабильный результат.

Откуда берутся дубли и повторы

  • Клиентские и серверные ретраи после таймаута: первый запрос дошёл, но ответ потерялся.
  • Брокеры сообщений чаще гарантируют «не реже одного раза» (at‑least‑once), а не «ровно один раз».
  • Платёжные провайдеры, курьерки и внешние партнёры шлют вебхуки повторно до подтверждения.
  • Сбои сети и флап статики: балансировщики могут переотправить запрос на другую реплику.
  • Пользователь нажимает кнопку «Оплатить» дважды — классика.

Повторы неизбежны по самой природе распределённых систем. Значит, стратегия — сделать операции устойчивыми к повторам.

HTTP: идемпотентный ключ и повторяемые ответы

Базовый приём для небезопасных по природе методов (POST, PATCH):

  • Клиент присылает заголовок Idempotency-Key (уникальный ключ попытки) и повторяет его на ретраях.
  • Сервер формирует отпечаток запроса (метод + путь + ключ + при желании хэш тела) и сохраняет результат обработки.
  • Если такой отпечаток уже был и завершился — мгновенно возвращаем тот же ответ и статус.
  • Если обработка идёт прямо сейчас — отвечаем конфликтом с подсказкой «попробуйте позже».

Схема таблиц в PostgreSQL

-- Храним статус обработки и ответ, чтобы вернуть его при повторе
create table if not exists idempotency (
  fingerprint text primary key,
  status text not null check (status in ('IN_PROGRESS','COMPLETED','FAILED')),
  response_status int,
  response_body jsonb,
  created_at timestamptz not null default now(),
  updated_at timestamptz not null default now()
);

create index if not exists idempotency_created_at_idx on idempotency (created_at);

-- Обновляем updated_at
create or replace function set_updated_at()
returns trigger language plpgsql as $$
begin
  new.updated_at = now();
  return new;
end;$$;

drop trigger if exists idempotency_set_updated_at on idempotency;
create trigger idempotency_set_updated_at
before update on idempotency
for each row execute procedure set_updated_at();

-- Пример бизнес‑таблицы для платежей: уникальность по ключу
create table if not exists payments (
  id uuid primary key,
  idem_key text not null unique,
  amount_cents int not null check (amount_cents > 0),
  currency text not null,
  status text not null check (status in ('NEW','CAPTURED','FAILED')),
  created_at timestamptz not null default now()
);

Обработчик на Python/Flask с защитой от гонок

Ниже — минимальный пример. Он:

  • требует заголовок Idempotency-Key;
  • атомарно помечает попытку как IN_PROGRESS;
  • при повторе отдаёт закешированный ответ;
  • при конкурирующих запросах одного и того же ключа — возвращает 409 с Retry-After;
  • завершает записью COMPELETED и телом ответа.
# requirements:
#   flask==3.0.2
#   psycopg[binary]==3.1.18
#   psycopg_pool==3.2.2
# run:
#   export DATABASE_URL=postgresql://user:pass@localhost:5432/app
#   flask --app app run -h 0.0.0.0 -p 8000

import hashlib
import json
import os
import uuid
from datetime import timedelta, datetime, timezone

from flask import Flask, request, jsonify, make_response
from psycopg_pool import ConnectionPool

app = Flask(__name__)
DB_URL = os.environ.get("DATABASE_URL")
pool = ConnectionPool(DB_URL, min_size=1, max_size=10, open=True)

IDEMP_INPROGRESS_TTL = timedelta(minutes=2)  # окно ожидания для конкурирующих запросов


def make_fingerprint(method: str, path: str, idem_key: str, body_bytes: bytes | None) -> str:
    # Не храним тело запроса, только хэш. Включаем метод и путь, чтобы один ключ не работал на другой эндпоинт.
    body_hash = hashlib.sha256(body_bytes or b"").hexdigest()
    base = f"{method.upper()}|{path}|{idem_key}|{body_hash}"
    return hashlib.sha256(base.encode()).hexdigest()


@app.post("/payments")
def create_payment():
    idem_key = request.headers.get("Idempotency-Key")
    if not idem_key or len(idem_key) > 128:
        return make_response({"error": "Idempotency-Key is required and must be <= 128 chars"}, 400)

    fp = make_fingerprint(request.method, request.path, idem_key, request.get_data())

    with pool.connection() as conn:
        with conn.cursor() as cur:
            conn.execute("begin")
            # Пытаемся пометить попытку как IN_PROGRESS. Если ключ новый — возвращается строка.
            cur.execute(
                """
                insert into idempotency (fingerprint, status)
                values (%s, 'IN_PROGRESS')
                on conflict do nothing
                returning fingerprint
                """,
                (fp,)
            )
            inserted = cur.fetchone()

            if not inserted:
                # Ключ уже был. Смотрим состояние, блокируем на чтение.
                cur.execute(
                    "select status, response_status, response_body, updated_at from idempotency where fingerprint=%s for update",
                    (fp,)
                )
                row = cur.fetchone()
                status, resp_code, resp_body, updated_at = row

                if status == 'COMPLETED':
                    conn.execute("commit")
                    resp = make_response(jsonify(resp_body or {}), resp_code or 200)
                    resp.headers["X-Idempotent-Replay"] = "true"
                    return resp

                # Если обработка в процессе слишком долго — позволим захватить попытку заново (редкий кейс)
                if status == 'IN_PROGRESS' and updated_at < datetime.now(timezone.utc) - IDEMP_INPROGRESS_TTL:
                    cur.execute(
                        "update idempotency set status='IN_PROGRESS' where fingerprint=%s",
                        (fp,)
                    )
                    # продолжаем как обычную новую попытку ниже
                else:
                    conn.execute("commit")
                    resp = make_response({"error": "Processing", "retry_in_seconds": 5}, 409)
                    resp.headers["Retry-After"] = "5"
                    return resp

            # Здесь у нас уникальное право выполнить бизнес‑логику для этого отпечатка.
            try:
                amount_cents = int((request.json or {}).get("amount_cents", 0))
                currency = (request.json or {}).get("currency", "USD")
                if amount_cents <= 0:
                    raise ValueError("amount_cents must be > 0")

                payment_id = str(uuid.uuid4())
                # В реальности здесь вы бы сходили к провайдеру оплаты с тем же idem_key.
                # Для примера просто создадим запись в БД с уникальным ключом.
                cur.execute(
                    """
                    insert into payments (id, idem_key, amount_cents, currency, status)
                    values (%s, %s, %s, %s, 'CAPTURED')
                    on conflict (idem_key) do update set status=excluded.status
                    returning id, status
                    """,
                    (payment_id, idem_key, amount_cents, currency)
                )
                row = cur.fetchone()
                final_id, final_status = row

                response_body = {
                    "payment_id": final_id,
                    "status": final_status,
                    "amount_cents": amount_cents,
                    "currency": currency,
                }

                cur.execute(
                    "update idempotency set status='COMPLETED', response_status=%s, response_body=%s where fingerprint=%s",
                    (201, json.dumps(response_body), fp)
                )
                conn.execute("commit")
                resp = make_response(jsonify(response_body), 201)
                resp.headers["X-Idempotent-Recorded"] = "true"
                return resp
            except Exception as e:
                cur.execute(
                    "update idempotency set status='FAILED', response_status=%s, response_body=%s where fingerprint=%s",
                    (500, json.dumps({"error": str(e)}), fp)
                )
                conn.execute("commit")
                return make_response({"error": "failed", "detail": str(e)}, 500)


@app.get("/health")
def health():
    return {"ok": True}


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

Срок жизни ключа, размер, отпечаток запроса

  • TTL для записей: храните окна столько, сколько клиенты потенциально могут ретраить (обычно 24–72 часа).
  • Чистка старых ключей: периодически удаляйте COMPLETED/FAILED старше TTL.
  • Отпечаток: включайте метод, путь, ключ, хэш тела. Это защищает от использования одного и того же ключа на разных операциях.
  • Размер ключа: ограничьте до разумной длины (например, 128 символов).

Очистка по расписанию:

-- Удаляем записи старше 3 суток
delete from idempotency where created_at < now() - interval '3 days';

Очереди и фоновые задачи: как не обрабатывать сообщение дважды

В брокерах «ровно один раз» — дорогая иллюзия. Реально и просто достигается «по крайней мере один раз» плюс идемпотентность потребителя.

Стратегии:

  • Уникальный message_id в заголовках. Потребитель хранит, что уже обработано.
  • Идемпотентные апдейты в БД: операции делайте через upsert/merge по бизнес‑ключу.
  • Компенсации для внешних вызовов: если идёте «во внешний мир», проектируйте повторяемые запросы и безопасные откаты.

Маркер обработки в БД: «впереди обработки» и «после обработки»

  • Маркер до обработки (insert, on conflict do nothing):
    • Плюс: гарантированно только один потребитель начнёт обработку.
    • Минус: если упали посередине и не сделали «revert», сообщение помечено как начатое — потребуется повторная доставка с таймаутом/рекавери.
  • Маркер после обработки:
    • Плюс: проще. Минус: если упали до записи маркера, при ретрае обработаете второй раз — значит сама операция должна быть идемпотентной (через бизнес‑ключи, upsert и т. п.).

На практике комбинируют: идемпотентная бизнес‑операция + запись маркера «после», а критичные секции оборачивают транзакцией.

Пример кода: дедупликация сообщений в PostgreSQL

# Дедупликация сообщений по уникальному message_id
# Вариант «после обработки»: бизнес‑операция идемпотентна сама по себе.

from contextlib import contextmanager
from psycopg import connect
import os

DSN = os.environ.get("DATABASE_URL")

DDL = """
create table if not exists processed_messages (
  message_id text primary key,
  processed_at timestamptz not null default now()
);

create table if not exists stock (
  sku text primary key,
  qty int not null
);
"""

@contextmanager
def get_conn():
    with connect(DSN) as conn:
        yield conn


def ensure_schema():
    with get_conn() as conn:
        conn.execute(DDL)
        conn.commit()


def process_message(message_id: str, sku: str, delta_qty: int):
    """
    Идемпотентное изменение остатков:
    - upsert по SKU (если нет — создаём; если есть — корректируем),
    - затем фиксируем message_id. Если ретрай — просто увидим запись и пропустим.
    """
    with get_conn() as conn, conn.cursor() as cur:
        conn.execute("begin")
        # Проверяем, не обрабатывали ли этот message_id
        cur.execute("select 1 from processed_messages where message_id=%s", (message_id,))
        if cur.fetchone():
            conn.execute("commit")
            return "duplicate"

        # Идемпотентный апдейт остатков через upsert
        cur.execute(
            """
            insert into stock (sku, qty) values (%s, %s)
            on conflict (sku) do update set qty = stock.qty + excluded.qty
            returning qty
            """,
            (sku, delta_qty)
        )
        new_qty = cur.fetchone()[0]

        # Фиксируем, что сообщение обработано
        cur.execute(
            "insert into processed_messages (message_id) values (%s) on conflict do nothing",
            (message_id,)
        )
        conn.execute("commit")
        return {"sku": sku, "qty": new_qty}


if __name__ == "__main__":
    ensure_schema()
    print(process_message("m-1", "ABC", +10))  # {sku: ABC, qty: 10}
    print(process_message("m-1", "ABC", +10))  # duplicate
    print(process_message("m-2", "ABC", -3))   # {sku: ABC, qty: 7}

Деньги и инвентарь: особые правила

  • Платежи: ключ идемпотентности должен жить на всём пути — от клиента до платёжного провайдера. Многие PSP поддерживают его нативно (передавайте при запросе). Храните связь «идем‑ключ → операция» и возвращайте один и тот же результат.
  • Учёт: используйте двусторонние проводки (двойная запись), чтобы каждая операция имела уникальный бизнес‑идентификатор. Повторное применение должно быть «в ноль» и не менять итоговый баланс.
  • Склад: любые изменения остатков — только через операции с уникальным идентификатором и строгими инвариантами (например, не уходить в минус без разрешения бизнес‑правил).

Наблюдаемость и тестирование

  • Логи: записывайте Idempotency-Key, отпечаток запроса, статус (IN_PROGRESS/COMPLETED/REPLAY), длительность, исходные коды.
  • Метрики: счётчики повторов, конфликтов, доля «replay» ответов, время до завершения IN_PROGRESS, возраст самых старых записей в idempotency.
  • Трейсинг: прокидывайте ключ идемпотентности в спаны — удобно связывать первичный запрос и реплей.
  • Нагрузочные тесты: эмулируйте одновременные запросы с одним и тем же ключом и убедитесь, что выигрывает только один, а остальные получают 409/429.
  • Интеграционные тесты: проверяйте, что повторный POST с тем же ключом возвращает тот же JSON и код статуса.

Риски и анти‑паттерны

  • Один и тот же ключ без привязки к пути/методу — можно «переиспользовать» ключ на другой операции. Делайте полный отпечаток.
  • Слишком короткий TTL — клиент ретраит через 25 часов, а вы уже забыли ответ. Согласуйте окна ретраев.
  • Сохранение чувствительных данных в response_body — храните минимум, без персональных данных.
  • Кэширование ответов ошибок 500 как удачных — различайте FAILED и COMPLETED; при FAILED лучше не кэшировать тело, а вернуть корректную ошибку и позволить ретрай с тем же ключом (или запретить, если операция опасна).
  • Отсутствие защиты от гонок: если не делаете вставку с on conflict do nothing + блокировку на чтение, два потока могут одновременно начать обработку.
  • Долгие IN_PROGRESS без таймаута: застрявшие ключи мешают клиентам. Делайте «reclaim» после таймаута или отдельную джобу очистки/разблокировки.

Чек‑лист внедрения

  • Определите операции, чувствительные к повторам (платежи, заказы, пополнения, списания, резервирование).
  • Введите Idempotency-Key для POST/PATCH, договоритесь о TTL и политике ретраев с клиентами.
  • Реализуйте таблицу idempotency и atomic insert с on conflict do nothing.
  • Сохраняйте итоговый ответ и статус; отдавайте его при повторе запроса.
  • Для очередей используйте message_id и таблицу processed_messages, а бизнес‑апдейты делайте идемпотентно (upsert/merge).
  • Настройте метрики: процент реплеев, среднее время IN_PROGRESS, конфликты.
  • Напишите тесты на конкурентные запросы и повторную доставку сообщений.
  • Регулярно чистите старые ключи (cron/джоба в планировщике).

Что ещё посмотреть

  • Саги и компенсирующие действия для распределённых транзакций — полезны при длинных цепочках сервисов.
  • Транзакционный аутбокс дополняет идемпотентность на границе БД↔брокер, но не заменяет её на уровне бизнес‑операций.
  • OpenTelemetry для трассировки потока «ключ идемпотентности → операция → ответ/повтор».

APIPostgreSQLочередиидемпотентностьнадёжность