Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Транзакционный аутбокс: как перестать терять события между БД и брокером — стабильные интеграции и меньше инцидентов

Разработка и технологии6 апреля 2026 г.
Потерянные уведомления, зависшие статусы платежей и «магически» исчезнувшие задачи — типичная боль систем, где база и очередь живут раздельной жизнью. Разбираем паттерн «транзакционный аутбокс»: как одним коммитом зафиксировать и бизнес‑событие, и запись о его доставке, а затем надёжно разослать его по внешним каналам. Пошаговая схема, SQL, рабочий воркер на Go, метрики и грабли.
Транзакционный аутбокс: как перестать терять события между БД и брокером — стабильные интеграции и меньше инцидентов

Оглавление

  • Зачем бизнесу транзакционный аутбокс
  • Как работает паттерн и какие гарантии даёт
  • Варианты реализации: поллинг, CDC (Debezium), очередь в БД
  • Схема таблицы outbox и индексы
  • Пример: создание заказа и запись события в одной транзакции
  • Пример воркера-доставщика: параллельная отправка с backoff
  • Масштабирование и производительность: SKIP LOCKED, партии, порядок
  • Наблюдаемость и контроль: метрики, алерты, дэд‑леттер
  • Безопасность и данные: что класть в payload
  • Частые ошибки и как их избежать
  • Чек‑лист внедрения
  • Когда аутбокс не нужен

Зачем бизнесу транзакционный аутбокс

Сценарий знаком многим: заказ создан в БД, но письмо клиенту не ушло; платёж проведён, а вебхук партнёру «затерялся»; склад не получил событие о резервировании — начались ручные допроведения и человеческий фактор. Корень проблемы — разделение двух операций: запись в базу и отправка сообщения во внешний мир (очередь, вебхук, e‑mail). Если между ними что‑то пойдёт не так, система теряет согласованность.

Транзакционный аутбокс (outbox) решает это: мы фиксируем событие в таблице в той же базе и в той же транзакции, где меняем бизнес‑данные. А отдельный надёжный воркер дочитывает эту таблицу и доставляет события дальше с ретраями. В итоге «сначала сохранили в БД — потом попробовали отправить» превращается в «сохранили и поставили в локальную очередь атомарно; доставим хоть из офлайна».

Для бизнеса это значит меньше инцидентов, меньше ручной работы, чище отчёты и прогнозируемые интеграции с партнёрами.

Как работает паттерн и какие гарантии даёт

Коротко:

  1. В бизнес‑транзакции меняем состояние доменной сущности (например, создаём заказ) и одновременно вставляем запись в outbox с описанием события.
  2. Воркеры периодически выбирают из outbox новые записи, помечают их «в работе», отправляют вовне (в брокер, HTTP, e‑mail и т. п.) и потом помечают «доставлено» или планируют повторную попытку.

Что получаем по гарантиям:

  • Нет потерь: событие точно зафиксировано в БД — мы его не «забудем».
  • По крайней мере один раз: событие может быть доставлено повторно (поэтому на стороне потребителей нужны идемпотентные обработчики, но это проще, чем разбираться с потерями).
  • Управляемая задержка: можно считать лаг и держать SLO по доставке.

Варианты реализации: поллинг, CDC (Debezium), очередь в БД

  • Поллинг из приложения. Проще всего: фоновый процесс выбирает записи из outbox и шлёт дальше. Плюсы: без дополнительных сервисов, легко контролировать. Минусы: нагрузка на БД при больших объёмах.
  • CDC (Change Data Capture), например Debezium. Вычитывает изменения из журнала БД и кладёт в брокер (Kafka). Плюсы: высокая пропускная способность, минимальная задержка. Минусы: сложнее инфраструктура и эксплуатация.
  • Очередь в БД вместо внешнего брокера. Подходит, если интеграции немного и объёмы невелики. Минус: БД начинает играть роль очереди, нужен аккуратный тюнинг.

Для большинства B2B‑сервисов со стабильным трафиком достаточно поллинга, а к CDC можно прийти позже.

Схема таблицы outbox и индексы

-- PostgreSQL
CREATE TYPE outbox_status AS ENUM ('pending', 'in_progress', 'delivered', 'failed');

CREATE TABLE outbox (
  id              BIGSERIAL PRIMARY KEY,
  aggregate_type  TEXT        NOT NULL,           -- тип сущности (order, invoice, user)
  aggregate_id    TEXT        NOT NULL,           -- идентификатор сущности (строка на случай UUID)
  event_type      TEXT        NOT NULL,           -- тип события (order.created)
  idempotency_key UUID        NOT NULL,           -- уникальный ключ события для потребителей
  payload         JSONB       NOT NULL,           -- полезная нагрузка (минимум PII)
  status          outbox_status NOT NULL DEFAULT 'pending',
  attempts        INT         NOT NULL DEFAULT 0,
  available_at    TIMESTAMPTZ NOT NULL DEFAULT now(), -- отложенная доставка
  next_attempt_at TIMESTAMPTZ,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE UNIQUE INDEX ux_outbox_idempotency ON outbox (idempotency_key);
CREATE INDEX ix_outbox_pick ON outbox (status, next_attempt_at, available_at, id);
CREATE INDEX ix_outbox_created ON outbox (created_at);

-- Триггер обновляет updated_at
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at = now();
  RETURN NEW;
END; $$ LANGUAGE plpgsql;

CREATE TRIGGER trg_outbox_updated
BEFORE UPDATE ON outbox
FOR EACH ROW EXECUTE PROCEDURE set_updated_at();

Рекомендации:

  • Держите payload компактным: ссылки и идентификаторы, а не большие вложения.
  • Для высоких объёмов — партиционирование по дате created_at.
  • Храните available_at/next_attempt_at, чтобы избегать ожесточённого поллинга.

Пример: создание заказа и запись события в одной транзакции

Ниже — безопасный паттерн: бизнес‑операция и запись в outbox фиксируются одним коммитом.

-- Создание заказа и запись события в outbox в одной транзакции
BEGIN;

INSERT INTO orders (id, customer_id, amount_cents, currency, status, created_at)
VALUES ($1, $2, $3, $4, 'created', now());

INSERT INTO outbox (
  aggregate_type, aggregate_id, event_type, idempotency_key, payload, status, available_at
) VALUES (
  'order', $1, 'order.created', gen_random_uuid(),
  jsonb_build_object(
    'order_id', $1,
    'customer_id', $2,
    'amount_cents', $3,
    'currency', $4
  ),
  'pending', now()
);

COMMIT;

Важно: никогда не отправляйте событие во внешний мир до фиксации транзакции. Только после успешного коммита воркер увидит запись в outbox и возьмёт её в работу.

Пример воркера-доставщика: параллельная отправка с backoff

Воркеры забирают пачку записей, помечают «в работе», отправляют HTTP‑вебхуки на ваш интеграционный шлюз или партнёрам, и обновляют статус. Используем Postgres‑специфику: SELECT … FOR UPDATE SKIP LOCKED. Это позволяет безопасно запускать несколько воркеров параллельно.

Ниже полноценный пример на Go: без внешних библиотек, только стандартная библиотека. Ворк отправляет вебхуки по HTTPS, подписывает их HMAC‑подписью и реализует экспоненциальный backoff.

package main

import (
    "context"
    "crypto/hmac"
    "crypto/sha256"
    "database/sql"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "strings"
    "time"

    _ "github.com/lib/pq"
)

// Конфигурация воркера
type Config struct {
    DBURL             string
    WebhookURL        string
    HMACSecret        string
    BatchSize         int
    MaxAttempts       int
    PollInterval      time.Duration
    RequestTimeout    time.Duration
    ParallelSends     int
}

type OutboxEvent struct {
    ID             int64           `json:"id"`
    AggregateType  string          `json:"aggregate_type"`
    AggregateID    string          `json:"aggregate_id"`
    EventType      string          `json:"event_type"`
    IdempotencyKey string          `json:"idempotency_key"`
    Payload        json.RawMessage `json:"payload"`
}

func main() {
    cfg := Config{
        DBURL:          env("DB_URL", "postgres://user:pass@localhost:5432/app?sslmode=disable"),
        WebhookURL:     mustEnv("WEBHOOK_URL"),
        HMACSecret:     mustEnv("HMAC_SECRET"),
        BatchSize:      envInt("BATCH_SIZE", 100),
        MaxAttempts:    envInt("MAX_ATTEMPTS", 10),
        PollInterval:   envDuration("POLL_INTERVAL", 2*time.Second),
        RequestTimeout: envDuration("REQUEST_TIMEOUT", 5*time.Second),
        ParallelSends:  envInt("PARALLEL_SENDS", 10),
    }

    db, err := sql.Open("postgres", cfg.DBURL)
    if err != nil { log.Fatalf("db open: %v", err) }
    defer db.Close()

    httpClient := &http.Client{ Timeout: cfg.RequestTimeout }

    ticker := time.NewTicker(cfg.PollInterval)
    defer ticker.Stop()

    for {
        if n := dispatchOnce(db, httpClient, cfg); n == 0 {
            <-ticker.C
        }
    }
}

func dispatchOnce(db *sql.DB, httpClient *http.Client, cfg Config) int {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
    if err != nil { log.Printf("begin tx: %v", err); return 0 }

    // Забираем пачку событий и помечаем их in_progress атомарно
    rows, err := tx.QueryContext(ctx, `
        WITH cte AS (
          SELECT id
          FROM outbox
          WHERE status = 'pending'
            AND coalesce(next_attempt_at, now()) <= now()
            AND available_at <= now()
          ORDER BY created_at
          FOR UPDATE SKIP LOCKED
          LIMIT $1
        )
        UPDATE outbox o
        SET status = 'in_progress'
        FROM cte
        WHERE o.id = cte.id
        RETURNING o.id, o.aggregate_type, o.aggregate_id, o.event_type, o.idempotency_key, o.payload
    `, cfg.BatchSize)
    if err != nil { _ = tx.Rollback(); log.Printf("select: %v", err); return 0 }

    var batch []OutboxEvent
    for rows.Next() {
        var e OutboxEvent
        if err := rows.Scan(&e.ID, &e.AggregateType, &e.AggregateID, &e.EventType, &e.IdempotencyKey, &e.Payload); err != nil {
            _ = tx.Rollback(); log.Printf("scan: %v", err); return 0
        }
        batch = append(batch, e)
    }
    rows.Close()

    if err := tx.Commit(); err != nil {
        log.Printf("commit: %v", err)
        return 0
    }

    if len(batch) == 0 { return 0 }

    // Параллельная отправка
    sem := make(chan struct{}, cfg.ParallelSends)
    done := make(chan struct{})
    for _, ev := range batch {
        sem <- struct{}{}
        go func(ev OutboxEvent) {
            defer func() { <-sem; done <- struct{}{} }()
            deliverEvent(db, httpClient, cfg, ev)
        }(ev)
    }
    for i := 0; i < len(batch); i++ { <-done }
    return len(batch)
}

func deliverEvent(db *sql.DB, httpClient *http.Client, cfg Config, ev OutboxEvent) {
    // Формируем HTTP‑запрос с подписью: X-Signature: sha256=...
    body := map[string]any{
        "event_type":       ev.EventType,
        "idempotency_key":  ev.IdempotencyKey,
        "aggregate_type":   ev.AggregateType,
        "aggregate_id":     ev.AggregateID,
        "payload":          json.RawMessage(ev.Payload),
        "sent_at":          time.Now().UTC().Format(time.RFC3339Nano),
    }
    b, _ := json.Marshal(body)
    sig := signHMAC(cfg.HMACSecret, b)

    req, _ := http.NewRequest(http.MethodPost, cfg.WebhookURL, strings.NewReader(string(b)))
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Signature", "sha256="+sig)
    req.Header.Set("X-Idempotency-Key", ev.IdempotencyKey)

    resp, err := httpClient.Do(req)
    if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
        _ = resp.Body.Close()
        markDelivered(db, ev.ID)
        return
    }
    if resp != nil { _ = resp.Body.Close() }

    // Ошибка: увеличиваем счётчик и планируем retry
    scheduleRetry(db, ev.ID, cfg.MaxAttempts)
}

func markDelivered(db *sql.DB, id int64) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    if _, err := db.ExecContext(ctx, `UPDATE outbox SET status='delivered' WHERE id=$1`, id); err != nil {
        log.Printf("mark delivered %d: %v", id, err)
    }
}

func scheduleRetry(db *sql.DB, id int64, maxAttempts int) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    // Экспоненциальный backoff: min(2^attempts * 1s, 5m)
    _, err := db.ExecContext(ctx, `
        UPDATE outbox
        SET attempts = attempts + 1,
            status = CASE WHEN attempts + 1 >= $2 THEN 'failed' ELSE 'pending' END,
            next_attempt_at = CASE
                WHEN attempts + 1 >= $2 THEN null
                ELSE now() + LEAST(((2 ^ (attempts + 1))::int)::text::interval, '5 minutes'::interval)
            END
        WHERE id = $1
    `, id, maxAttempts)
    if err != nil {
        log.Printf("schedule retry %d: %v", id, err)
    }
}

func signHMAC(secret string, body []byte) string {
    mac := hmac.New(sha256.New, []byte(secret))
    mac.Write(body)
    return hex.EncodeToString(mac.Sum(nil))
}

// Утилиты окружения
func env(k, def string) string {
    if v := os.Getenv(k); v != "" { return v }
    return def
}
func mustEnv(k string) string {
    v := os.Getenv(k)
    if v == "" { log.Fatalf("env %s required", k) }
    return v
}
func envInt(k string, def int) int {
    if v := os.Getenv(k); v != "" { var i int; fmt.Sscanf(v, "%d", &i); return i }
    return def
}
func envDuration(k string, def time.Duration) time.Duration {
    if v := os.Getenv(k); v != "" { d, err := time.ParseDuration(v); if err == nil { return d } }
    return def
}

Примечания к воркеру:

  • Можно безопасно запускать несколько экземпляров: SKIP LOCKED не даст им взять одну и ту же запись.
  • Для тяжёлых нагрузок разделите воркеры по типам событий (event_type) — разные пулы, разные лимиты.
  • Если внешний приёмник требует строгого порядка для одной сущности, упорядочивайте по (aggregate_type, aggregate_id, created_at) и обеспечьте, чтобы одновременно не обрабатывались два события одной сущности (advisory lock по хэшу aggregate_id).

Масштабирование и производительность: SKIP LOCKED, партии, порядок

  • Патчи по 100–1000 записей обычно достаточно. Не гонитесь за гигантскими партиями: важнее стабильная задержка доставки.
  • Индексы по (status, next_attempt_at, available_at, id) — обязательны.
  • Для равномерности нагрузки держите отдельный воркер, который периодически чистит доставленные записи старше N дней (или переносит их в архивную таблицу через партиции).
  • «Ровно один раз» — миф без сильной цены. Рекомендуем «по крайней мере один раз» плюс идемпотентность у потребителей (например, по X-Idempotency-Key).

Пример блокировки по сущности через advisory lock, если нужно сохранять порядок для одного заказа:

-- Внутри транзакции доставки конкретной записи можно взять advisory lock:
SELECT pg_advisory_xact_lock(hashtext(aggregate_type || ':' || aggregate_id));

Наблюдаемость и контроль: метрики, алерты, дэд‑леттер

Отслеживайте:

  • Лаг: возраст самого старого pending/in_progress события (цель — минуты, не часы).
  • Скорость доставки: событий/мин.
  • Процент ошибок за окно времени и распределение по кодам ответа.
  • Размер «кладбища» failed и число попыток до успеха.

Алерты:

  • Лаг > целевого SLO (например, > 5 минут) в течение 10 минут.
  • Ошибки доставки > 5% за 15 минут.
  • Нет доставленных событий X минут подряд при наличии pending.

Дэд‑леттер (dead‑letter): при достижении MaxAttempts переводите событие в failed и складывайте его в отдельную таблицу/очередь для ручного или автоматического повторного разбора.

Безопасность и данные: что класть в payload

  • Не храните лишние персональные данные. Лучше передавать идентификаторы и восстанавливать детали на стороне потребителя по API.
  • Если всё‑таки передаете чувствительные фрагменты — шифруйте payload на уровне приложения.
  • Подписывайте вебхуки (как в примере HMAC) и проверяйте подпись на приёмной стороне.
  • Храните idempotency_key и логируйте его сквозь цепочку — это упростит расследование инцидентов.

Частые ошибки и как их избежать

  • Отправка события до коммита. Итог: «призрачные» сообщения о том, чего в БД нет. Лекарство: только через outbox в одной транзакции.
  • Нет идемпотентности у потребителей. Итог: дубли и расхождения. Лекарство: dedup по ключу, upsert, проверка «уже обработано».
  • Гигантские payload и отсутствие TTL. Итог: раздутые таблицы, замедление. Лекарство: компактный payload, партиции, очистка.
  • Один монолитный воркер. Итог: очередь простаивает при локальных сбоях. Лекарство: горизонтальное масштабирование, SKIP LOCKED, разнесение по типам событий.
  • Нет метрик и алертов. Итог: узнаёте от клиентов. Лекарство: лаг, error rate, delivered/min в мониторинге.

Чек‑лист внедрения

  • Добавили таблицу outbox с индексами и полями attempts/next_attempt_at.
  • Включили запись в outbox в ту же транзакцию, что и бизнес‑операция.
  • Запустили воркер(ы) с SKIP LOCKED, партиями и backoff.
  • Обеспечили идемпотентность у получателей (ключ, upsert, повторяемость).
  • Добавили метрики лага, ошибок, скорости и алерты.
  • Настроили архив/очистку доставленных и дэд‑леттер для фейлов.
  • Проверили нагрузку: нагрузочные тесты на пиках.

Когда аутбокс не нужен

  • Вся ваша логика — внутри одной БД, без внешних интеграций и фоновых реакций.
  • Внешние интеграции допускают «почти‑время‑реальное» через периодическую выгрузку (batch once per day) — и потери не критичны.

Если же у вас есть вебхуки, брокеры, e‑mail, платёжные статусы, складские или бухгалтерские интеграции — аутбокс резко снижает риск потерь и стоимость инцидентов. Это тот случай, когда простая таблица и аккуратный воркер экономят часы инженеров и нервы поддержки.


PostgreSQLархитектуранадежность