Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Транзакционный outbox/inbox: события без потерь и дублей — стабильные заказы и интеграции

Разработка и технологии12 марта 2026 г.
Когда сервис пишет в базу и параллельно шлёт событие во внешний мир, всегда есть риск: база закоммитилась, а публикация упала — или наоборот. Паттерны outbox и inbox решают это без «магии»: события фиксируются в той же транзакции, публикуются воркером с ретраями, а потребитель дедуплицирует и подтверждает обработку. Результат — нет потерянных заказов, стабильные интеграции и предсказуемая эксплуатация.
Транзакционный outbox/inbox: события без потерь и дублей — стабильные заказы и интеграции

  • Оглавление
    • Зачем это бизнесу
    • Где ломается «сохранили и отправили»
    • Паттерн outbox: идея и гарантии
    • Паттерн inbox у потребителя
    • Схемы таблиц в PostgreSQL
    • Как писать в outbox в одной транзакции
    • Воркер-публикатор: ретраи, бэкофф, метки состояний
    • Потребитель с inbox и проверкой подписи
    • Порядок событий и консистентность
    • Наблюдаемость и эксплуатация
    • Миграция на outbox/inbox без остановок
    • Типовые ошибки и как их избежать
    • Экономика: где выгода

Зачем это бизнесу

Любой продукт с заказами, оплатами, доставкой или партнёрскими интеграциями живёт на событиях: «заказ создан», «платёж прошёл», «посылка отгружена». Эти события требуют надёжной доставки наружу (в брокер, в вебхук партнёра) и такой же надёжной обработки на входе. Потерянное или продублированное событие влечёт деньги, SLA и репутацию.

Паттерны outbox и inbox дают две простые вещи:

  • Событие не потеряется между базой и «внешним миром».
  • Дубликаты у потребителя безопасно поглотятся.

Итог: меньше инцидентов, предсказуемые релизы, проще аудит.

Где ломается «сохранили и отправили»

Наивный подход: в коде сохраняем заказ в базе, затем публикуем событие в очередь или шлём вебхук. Между этими шагами — сеть, драйверы и всё, что может упасть. Возможны гонки:

  • База закоммитилась, публикация упала. Партнёр не узнает о заказе.
  • Публикация прошла, база откатилась. Внешний мир получил «призрак» заказа.

Двухфазная фиксация (2PC) теоретически решает, но в реальности дорога, сложна и редко доступна.

Паттерн outbox: идея и гарантии

Outbox — «исходящий журнал событий» в вашей основной базе. Суть:

  • В одной транзакции вместе с бизнес-записью (например, заказом) вы добавляете строку в таблицу outbox_events.
  • Отдельный воркер читает outbox, публикует вовне (вебхук, брокер) и помечает событие как доставленное или планирует повтор.

Гарантии:

  • Событие «родилось» атомарно вместе с бизнес-состоянием.
  • Доставка — как минимум один раз (at-least-once). Дубликаты на стороне потребителя устраняются inbox-паттерном.

Паттерн inbox у потребителя

Inbox — «входящий журнал обработанных событий» в сервисе-потребителе:

  • Каждое входящее событие имеет стабильный идентификатор (UUID/ULID).
  • Перед логикой вы пытаетесь вставить его в таблицу inbox_events с первичным ключом=id. Если вставка неуспешна из‑за конфликта — это дубль, можно сразу отвечать 200 OK.

Так достигается идемпотентность без сложных распределённых транзакций.

Схемы таблиц в PostgreSQL

-- Расширение для генерирования UUID
create extension if not exists pgcrypto;

-- Outbox: исходящие события
create table if not exists outbox_events (
  id uuid primary key default gen_random_uuid(),
  aggregate_type text not null,          -- что изменилось: 'order', 'invoice' и т.п.
  aggregate_id text not null,            -- идентификатор агрегата
  event_type text not null,              -- тип события: 'order.created'
  payload jsonb not null,                -- полезная нагрузка события
  dedup_key text,                        -- необязательная дедупликация на уровне outbox
  status text not null default 'pending',-- pending | processing | done | failed
  retries int not null default 0,
  available_at timestamptz not null default now(),
  created_at timestamptz not null default now(),
  processed_at timestamptz
);

-- Быстрый выбор по статусу/расписанию
create index if not exists idx_outbox_status_available on outbox_events(status, available_at);

-- Уникальность по опциональному ключу дедупликации (например, один заказ — одно событие создания)
create unique index if not exists uq_outbox_dedup_key on outbox_events(dedup_key) where dedup_key is not null;

-- Inbox: входящие события у потребителя
create table if not exists inbox_events (
  id uuid primary key,                   -- стабильно передаваемый идентификатор события
  received_at timestamptz not null default now(),
  processed_at timestamptz,              -- когда бизнес-логика завершена
  handler_status text not null,          -- pending | done | failed
  payload jsonb not null,
  signature text                         -- для аудита подписи
);

create index if not exists idx_inbox_status on inbox_events(handler_status);

Как писать в outbox в одной транзакции

Пример: создаём заказ и фиксируем событие «order.created» за один коммит.

begin;
  with new_order as (
    insert into orders(id, user_id, amount, status)
    values (gen_random_uuid(), 'u_123', 9900, 'created')
    returning id, user_id, amount
  )
  insert into outbox_events(
    aggregate_type, aggregate_id, event_type, payload, dedup_key
  )
  select
    'order', id::text, 'order.created',
    jsonb_build_object(
      'event_id', gen_random_uuid(),
      'order_id', id,
      'user_id', user_id,
      'amount', amount,
      'occurred_at', now()
    ),
    'order.created:' || id::text
  from new_order;
commit;

Тот же принцип легко реализуется из приложения. Важное: запись в outbox должна происходить в той же транзакции, что и бизнес-изменение.

Воркер-публикатор: ретраи, бэкофф, метки состояний

Ниже — минимально жизнеспособный воркер на Go, который периодически «забирает» пачку событий из outbox, шлёт вебхук с HMAC‑подписью и обновляет статусы. Он безопасен к гонкам: сначала атомарно помечает события как processing, затем обрабатывает.

package main

import (
	"context"
	"crypto/hmac"
	"crypto/sha256"
	"database/sql"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"log"
	"math"
	"net/http"
	"os"
	"time"

	_ "github.com/lib/pq"
)

type Event struct {
	ID      string
	Payload []byte
}

func main() {
	dsn := mustEnv("DB_DSN")             // пример: postgres://user:pass@localhost:5432/app?sslmode=disable
	webhookURL := mustEnv("WEBHOOK_URL") // куда шлём
	secret := mustEnv("WEBHOOK_SECRET")  // общий секрет для подписи

	db, err := sql.Open("postgres", dsn)
	if err != nil { log.Fatal(err) }
	defer db.Close()

	ctx := context.Background()
	batch := 50
	interval := 2 * time.Second

	for {
		if err := publishBatch(ctx, db, webhookURL, secret, batch); err != nil {
			log.Printf("publish error: %v", err)
		}
		time.Sleep(interval)
	}
}

func publishBatch(ctx context.Context, db *sql.DB, url, secret string, limit int) error {
	// 1) Забираем и помечаем события как processing атомарно
	tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
	if err != nil { return err }

	rows, err := tx.QueryContext(ctx, `
		with locked as (
			select id from outbox_events
			where status = 'pending' and available_at <= now()
			order by created_at
			for update skip locked
			limit $1
		), upd as (
			update outbox_events o
			set status = 'processing'
			from locked
			where o.id = locked.id
			returning o.id, o.payload
		)
		select id, payload from upd;
	`, limit)
	if err != nil { tx.Rollback(); return err }
	defer rows.Close()

	var events []Event
	for rows.Next() {
		var id string
		var payload []byte
		if err := rows.Scan(&id, &payload); err != nil { tx.Rollback(); return err }
		events = append(events, Event{ID: id, Payload: payload})
	}
	if err := rows.Err(); err != nil { tx.Rollback(); return err }
	if err := tx.Commit(); err != nil { return err }

	// 2) Отправляем каждый и фиксируем результат
	for _, e := range events {
		if err := sendOne(ctx, db, url, secret, e); err != nil {
			log.Printf("event %s failed: %v", e.ID, err)
		}
	}
	return nil
}

func sendOne(ctx context.Context, db *sql.DB, url, secret string, e Event) error {
	// HMAC подпись тела + метка времени
	ts := fmt.Sprintf("%d", time.Now().Unix())
	mac := hmac.New(sha256.New, []byte(secret))
	mac.Write([]byte(ts))
	mac.Write(e.Payload)
	sig := hex.EncodeToString(mac.Sum(nil))

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytesReader(e.Payload))
	if err != nil { return err }
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Event-Id", e.ID)
	req.Header.Set("X-Timestamp", ts)
	req.Header.Set("X-Signature", "sha256="+sig)

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return markRetry(ctx, db, e.ID, err)
	}
	defer resp.Body.Close()

	if resp.StatusCode >= 200 && resp.StatusCode < 300 {
		_, uerr := db.ExecContext(ctx, `update outbox_events set status='done', processed_at=now() where id=$1`, e.ID)
		return uerr
	}
	return markRetry(ctx, db, e.ID, fmt.Errorf("status %d", resp.StatusCode))
}

func markRetry(ctx context.Context, db *sql.DB, id string, cause error) error {
	// экспоненциальный бэкофф с верхней границей 10 минут
	var retries int
	if err := db.QueryRowContext(ctx, `update outbox_events
		set retries = retries + 1
		where id=$1
		returning retries`, id).Scan(&retries); err != nil {
		return err
	}
	sec := math.Min(600, math.Pow(2, float64(retries)))
	_, err := db.ExecContext(ctx, `update outbox_events
		set status='pending', available_at = now() + ($2 || ' seconds')::interval
		where id=$1`, id, int(sec))
	if err != nil { return err }
	return cause
}

// bytesReader — маленький помощник без лишних аллокаций
func bytesReader(b []byte) *bytes.Reader { return bytes.NewReader(b) }

// need bytes for bytes.NewReader
import "bytes"

func mustEnv(key string) string {
	v := os.Getenv(key)
	if v == "" { log.Fatalf("missing env %s", key) }
	return v
}

Пояснения:

  • Статус processing позволяет воркерам не драться за одни и те же события.
  • available_at задаёт расписание следующей попытки. Так мы избегаем горячих циклов при длительных падениях получателя.
  • HMAC‑подпись даёт потребителю уверенность, что событие пришло от нас и не изменено.

Лёгкая проверка публикатора

Можно временно поставить WEBHOOK_URL на httpbin.org/post, чтобы быстро увидеть, что отправка идёт. В бою, конечно, используйте свой приёмник с валидацией подписи.

Потребитель с inbox и проверкой подписи

Ниже — обработчик вебхука на Go. Он:

  • проверяет HMAC‑подпись;
  • делает попытку вставки в inbox_events по id из заголовка X‑Event‑Id;
  • при конфликте возвращает 200 OK (дубль);
  • после успешной бизнес‑логики отмечает processed_at и handler_status.
package main

import (
	"crypto/hmac"
	"crypto/sha256"
	"database/sql"
	"encoding/hex"
	"encoding/json"
	"io"
	"log"
	"net/http"
	"os"
	"time"

	_ "github.com/lib/pq"
)

type InboxPayload struct {
	EventID   string    `json:"event_id"`
	OrderID   string    `json:"order_id"`
	UserID    string    `json:"user_id"`
	Amount    int       `json:"amount"`
	OccurredAt time.Time `json:"occurred_at"`
}

func main() {
	dsn := mustEnv("DB_DSN")
	secret := mustEnv("WEBHOOK_SECRET")
	db, err := sql.Open("postgres", dsn)
	if err != nil { log.Fatal(err) }
	defer db.Close()

	http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
		body, err := io.ReadAll(r.Body)
		if err != nil { http.Error(w, "read", 400); return }
		defer r.Body.Close()

		id := r.Header.Get("X-Event-Id")
		ts := r.Header.Get("X-Timestamp")
		sig := r.Header.Get("X-Signature")
		if !verifyHMAC(secret, ts, body, sig) {
			http.Error(w, "bad signature", 401); return
		}

		// идемпотентная вставка
		var payload InboxPayload
		if err := json.Unmarshal(body, &payload); err != nil { http.Error(w, "json", 400); return }

		// Вставляем только если нет дубля
		_, err = db.Exec(`insert into inbox_events(id, handler_status, payload, signature)
			values ($1, 'pending', $2, $3)`, id, string(body), sig)
		if err != nil {
			// конфликт — уже обрабатывали
			if pqErr, ok := err.(*sql.Error); ok { _ = pqErr } // заглушка компилятора, pq не даёт тип ошибки; используем текст
			if isUniqueViolation(err) {
				w.WriteHeader(200); w.Write([]byte("duplicate")); return
			}
			http.Error(w, "db", 500); return
		}

		// Здесь — бизнес-логика: например, резервирование на складе.
		// Имитация быстрого действия
		time.Sleep(10 * time.Millisecond)

		_, err = db.Exec(`update inbox_events set handler_status='done', processed_at=now() where id=$1`, id)
		if err != nil { http.Error(w, "update", 500); return }

		w.WriteHeader(200)
		w.Write([]byte("ok"))
	})

	log.Println("listening on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

func verifyHMAC(secret, ts string, body []byte, header string) bool {
	const prefix = "sha256="
	if len(header) <= len(prefix) || header[:len(prefix)] != prefix { return false }
	expected := hmac.New(sha256.New, []byte(secret))
	expected.Write([]byte(ts))
	expected.Write(body)
	want := hex.EncodeToString(expected.Sum(nil))
	got := header[len(prefix):]
	return hmac.Equal([]byte(want), []byte(got))
}

func isUniqueViolation(err error) bool {
	// Упрощённо: проверим по тексту. В проде используйте errors.As к *pq.Error и код "23505".
	return err != nil && (contains(err.Error(), "unique") || contains(err.Error(), "duplicate key"))
}

func contains(s, sub string) bool { return len(s) >= len(sub) && (index(s, sub) >= 0) }

func index(s, sep string) int { return len([]rune(s[:])) - len([]rune(s[:])) + len(s) - len(s) + int64Index(s, sep) }

// простой индекс под строки без регистронезависимости
func int64Index(s, sep string) int { return len([]byte(s[:])) - len([]byte(s[:])) + bytesIndex([]byte(s), []byte(sep)) }

// bytesIndex — используем стандартную реализацию
import "bytes"
func bytesIndex(b, sub []byte) int { return bytes.Index(b, sub) }

func mustEnv(k string) string { v := os.Getenv(k); if v == "" { log.Fatalf("missing %s", k) }; return v }

Примечания к примеру потребителя:

  • Для чистоты продакшен‑кода в Go используйте тип ошибки из github.com/lib/pq (или pgx) и проверяйте код SQLSTATE 23505 для уникального ограничения.
  • Секрет подписи храните в менеджере секретов, меняйте по расписанию.

Порядок событий и консистентность

At‑least‑once гарантирует доставку, но не порядок. Если порядок критичен для одного агрегата (например, «order.created» → «order.paid»), используйте:

  • порядковый номер события (version) в payload и проверяйте его на стороне потребителя;
  • отдельные очереди/топики на агрегат или партиционирование по aggregate_id;
  • публикацию «из одного потока» на уровне воркера по ключу aggregate_id.

Практичный минимум: включите в payload поле sequence и обрабатывайте с отложкой, если пришло «будущее» событие без «прошлого».

Наблюдаемость и эксплуатация

  • Метрики: количество pending/processing/failed, время в очереди (now() − created_at), длительность публикации, процент ретраев.
  • Логи: обязательно логируйте event_id, aggregate_type/id, попытку, код ответа получателя.
  • Алерты: рост failed, очередь «старых» pending, превышение доли 5xx от получателя.
  • Трассировка: прокидывайте trace_id в заголовках и payload, чтобы связать базовую операцию и внешний вызов.

Миграция на outbox/inbox без остановок

  1. Добавьте таблицу outbox_events и запись в неё в тех местах, где рождаются события. Параллельно продолжайте старую публикацию.
  2. Поднимите воркер и дублируйте публикацию (двойная рассылка) на тестовый эндпоинт потребителя.
  3. Включите inbox у потребителя, чтобы он стал устойчив к дублям.
  4. Переключите боевую публикацию на воркер. Удалите старую прямую отправку из бизнес‑кода.
  5. Наблюдайте метрики и чистите старые «осиротевшие» записи.

Типовые ошибки и как их избежать

  • Нет стабильного event_id в payload — нечем дедуплицировать. Всегда добавляйте UUID.
  • Публикация «внутри» транзакции — держите блокировки слишком долго. Отмечайте события и коммитьте, затем отправляйте.
  • Агрессивные ретраи без бэкоффа — DDoS своего же партнёра. Держите экспоненциальный бэкофф с лимитом и DLQ (failed) после порога попыток.
  • Отсутствие подписи/валидации — любой может «шлёпнуть» вам событие. Подпись и белые списки IP помогают.
  • Отсутствие индексов по статусу/расписанию — воркер «мелет» всю таблицу. Индексы обязательны.

Экономика: где выгода

  • Нет потерь и «призраков» — меньше ручных разборов и компенсаций.
  • Простой код без тяжёлых распределённых транзакций — быстрее фичи, меньше багов.
  • Предсказуемая эксплуатация — прозрачные ретраи, метрики, алерты. Команда реагирует быстрее, простои короче.

Эти паттерны не навязывают конкретный брокер или протокол: сегодня это вебхуки партнёрам, завтра Kafka — схема и воркеры останутся теми же. Главное — атомарность рождения события и идемпотентность его потребления.


outboxнадёжностьinbox