Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Паттерн Outbox в PostgreSQL: гарантированная доставка событий в брокер без 2PC и потерь

Разработка и технологии5 февраля 2026 г.
Когда сервис пишет и в базу, и в брокер сообщений, всегда есть шанс потерять событие или получить несогласованные данные. Паттерн Outbox решает это за счёт журнала исходящих записей в той же транзакции, а отдельный ретранслятор надёжно публикует события в брокер. Разбираем архитектуру, готовую схему таблиц, рабочий код ретранслятора и практики эксплуатационной надёжности. В результате у бизнеса меньше инцидентов, а интеграции становятся предсказуемыми.
Паттерн Outbox в PostgreSQL: гарантированная доставка событий в брокер без 2PC и потерь

Оглавление

  • Зачем нужен Outbox и какую проблему он решает
  • Архитектура: журнал исходящих и ретранслятор
    • Как это работает
  • Схема таблиц в PostgreSQL и индексы
  • Ретранслятор: безопасная публикация с повторами (Go + Redis Streams)
    • Очереди неудачных сообщений и ручной разбор
  • Масштабирование, мониторинг и отладка
    • Горизонтальное масштабирование
    • Мониторинг
    • Обслуживание
  • Альтернативы: CDC, LISTEN/NOTIFY и почему не 2PC
  • Чек-лист внедрения
  • Частые ошибки
  • Экономика и выгода для бизнеса
  • Итоги

Зачем нужен Outbox и какую проблему он решает

Типичный микросервис делает две вещи: записывает бизнес-данные в базу и отправляет событие в брокер (Kafka/Rabbit/Redis Streams), чтобы другие сервисы синхронизировались. Если делать это раздельными вызовами, начинаются гонки и потери:

  • Успели записать в базу, но упали до публикации в брокер — событие потеряно.
  • Успели отправить в брокер, но транзакция в базе откатилась — мир узнал о том, чего в базе нет.

Двухфазная фиксация (2PC) между БД и брокером в реальных системах редко возможна или экономически оправдана: разная поддержка протоколов, блокировки и высокая сложность. Нужен способ «сшить» запись в базу и публикацию события без тяжёлого распределённого транзакционого протокола.

Решение — паттерн Outbox («журнал исходящих»): мы пишем бизнес-данные и событие в одну и ту же базу в рамках одной транзакции, а отдельный процесс безопасно и с повторами публикует эти события в брокер. Так мы гарантируем, что событие не потеряется, а публикация всегда соответствует записанным данным.

Архитектура: журнал исходящих и ретранслятор

Как это работает

  1. Приложение начинает транзакцию.
  2. Записывает бизнес-событие (например, создание заказа) в свои таблицы.
  3. Тут же добавляет строку в outbox-таблицу с полезной нагрузкой события.
  4. Коммит — теперь и данные, и запись в outbox зафиксированы атомарно.
  5. Отдельный ретранслятор (фоновый процесс/сервис) читает из outbox, публикует в брокер и помечает запись как доставленную или планирует повтор.

Важно: ретранслятор должен быть устойчив к падениям и дублированию. «Ровно-однажды» в распределённых системах — миф. Стандартная стратегия: допускаем повторную публикацию и обеспечиваем идемпотентность на стороне потребителя (inbox у потребителей, ключ дедупликации в событии).

Схема таблиц в PostgreSQL и индексы

Ниже — минимально практичная схема с индексами, поддержкой повторов и дедупликации на стороне продьюсера.

-- Таблица исходящих событий
CREATE TABLE IF NOT EXISTS outbox (
  id               BIGSERIAL PRIMARY KEY,
  topic            TEXT        NOT NULL,  -- логическая «тема»/канал события
  aggregate_type   TEXT        NOT NULL,  -- тип сущности (например, order)
  aggregate_id     TEXT        NOT NULL,  -- идентификатор сущности
  payload          JSONB       NOT NULL,  -- тело события
  headers          JSONB       NOT NULL DEFAULT '{}'::jsonb, -- метаданные: ключ дедупликации, трассировка
  dedup_key        TEXT,                  -- опционально: уникальный ключ события, чтобы не породить дубль в outbox
  status           TEXT        NOT NULL DEFAULT 'pending', -- pending|processing|done|failed
  attempts         INT         NOT NULL DEFAULT 0,
  next_attempt_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
  error            TEXT,
  created_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at       TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Уникальная дедупликация на уровне outbox (по желанию)
CREATE UNIQUE INDEX IF NOT EXISTS outbox_dedup_idx
  ON outbox(dedup_key) WHERE dedup_key IS NOT NULL;

-- Быстрый выбор только «созревших» к отправке
CREATE INDEX IF NOT EXISTS outbox_pending_idx
  ON outbox(status, next_attempt_at, created_at)
  WHERE status IN ('pending', 'failed');

-- Триггер для updated_at
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at = now();
  RETURN NEW;
END;$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS set_updated_at_trigger ON outbox;
CREATE TRIGGER set_updated_at_trigger
BEFORE UPDATE ON outbox
FOR EACH ROW EXECUTE FUNCTION set_updated_at();

Пример транзакции приложения: записываем заказ и добавляем событие.

BEGIN;
  INSERT INTO orders(id, user_id, amount, status)
  VALUES ('ord_1001', 'u42', 1250, 'created');

  INSERT INTO outbox(topic, aggregate_type, aggregate_id, payload, headers, dedup_key)
  VALUES (
    'orders.created',
    'order',
    'ord_1001',
    jsonb_build_object(
      'order_id','ord_1001',
      'user_id','u42',
      'amount',1250,
      'status','created'
    ),
    jsonb_build_object(
      'event_id', 'evt_2f2a8d7b-0b6f-4c13-9a6d-1a2f11b1a111',
      'trace_id', 'trc_7b9a...'
    ),
    'evt_2f2a8d7b-0b6f-4c13-9a6d-1a2f11b1a111'
  );
COMMIT;

Партиционирование: при больших объёмах удобно делать range-партиции по дате created_at (например, месяц). Это ускорит очистку старых «done» и снизит давление на VACUUM.

-- Пример партиционирования по месяцам (опционально)
ALTER TABLE outbox PARTITION BY RANGE (created_at);
CREATE TABLE IF NOT EXISTS outbox_2026_02 PARTITION OF outbox
  FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

Ретранслятор: безопасная публикация с повторами (Go + Redis Streams)

Ниже — рабочий пример ретранслятора на Go. Он:

  • Берёт порцию записей FOR UPDATE SKIP LOCKED, чтобы несколько экземпляров могли работать параллельно, не мешая друг другу.
  • Публикует каждую запись в Redis Streams (можно заменить на любой брокер).
  • При успехе удаляет запись из outbox; при ошибке — увеличивает attempts и откладывает следующий повтор (экспоненциальная пауза).

Для простоты используем Docker Compose с PostgreSQL и Redis.

# docker-compose.yml
version: "3.8"
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: postgres
      POSTGRES_USER: postgres
      POSTGRES_DB: app
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  redis:
    image: redis:7
    ports:
      - "6379:6379"

volumes:
  pgdata:

Установим зависимости и запустим ретранслятор.

# 1) Запускаем инфраструктуру
docker compose up -d

# 2) Инициализируем схему (вставьте из раздела со схемой)
psql postgresql://postgres:postgres@localhost:5432/app -f schema.sql

# 3) Подготовим Go-проект
mkdir outbox-relay && cd outbox-relay
go mod init example.com/outbox-relay
go get github.com/jackc/pgx/v5 github.com/redis/go-redis/v9

# 4) Создаём main.go (см. ниже) и запускаем
go run .

Код ретранслятора:

package main

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	redis "github.com/redis/go-redis/v9"
)

const (
	batchSize          = 100
	baseRetryDelay     = 2 * time.Second
	maxRetryDelay      = 5 * time.Minute
	streamName         = "events"
	claimLeaseSeconds  = 60 // опционально: окно «обработки»
)

type OutboxItem struct {
	ID            int64
	Topic         string
	AggregateType string
	AggregateID   string
	Payload       string
	Headers       string
}

func env(key, def string) string {
	if v := os.Getenv(key); v != "" {
		return v
	}
	return def
}

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	pgURL := env("PG_URL", "postgresql://postgres:postgres@localhost:5432/app")
	rdbAddr := env("REDIS_ADDR", "localhost:6379")

	pg, err := pgxpool.New(ctx, pgURL)
	if err != nil {
		log.Fatalf("pg connect: %v", err)
	}
	defer pg.Close()

	rdb := redis.NewClient(&redis.Options{Addr: rdbAddr})
	defer rdb.Close()

	log.Println("outbox relay started")
	Ticker := time.NewTicker(500 * time.Millisecond)
	defer Ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			log.Println("stopping...")
			return
		case <-Ticker.C:
			if err := processBatch(ctx, pg, rdb); err != nil {
				log.Printf("batch error: %v", err)
				// короткая пауза, чтобы не крутить процессор при постоянной ошибке
				time.Sleep(time.Second)
			}
		}
	}
}

func nextDelay(attempts int) time.Duration {
	d := baseRetryDelay * (1 << (attempts))
	if d > maxRetryDelay {
		return maxRetryDelay
	}
	return d
}

func processBatch(ctx context.Context, pg *pgxpool.Pool, rdb *redis.Client) error {
	// Транзакция удерживает блокировки строк, чтобы несколько ретрансляторов не взяли один и тот же элемент
	tx, err := pg.Begin(ctx)
	if err != nil {
		return fmt.Errorf("begin: %w", err)
	}
	defer func() {
		_ = tx.Rollback(ctx) // безопасно, если уже коммитнуто
	}()

	rows, err := tx.Query(ctx, `
		SELECT id, topic, aggregate_type, aggregate_id, payload::text, headers::text
		FROM outbox
		WHERE status IN ('pending','failed')
		  AND next_attempt_at <= now()
		ORDER BY created_at
		LIMIT $1
		FOR UPDATE SKIP LOCKED
	`, batchSize)
	if err != nil {
		return fmt.Errorf("select: %w", err)
	}
	defer rows.Close()

	items := make([]OutboxItem, 0, batchSize)
	for rows.Next() {
		var it OutboxItem
		if err := rows.Scan(&it.ID, &it.Topic, &it.AggregateType, &it.AggregateID, &it.Payload, &it.Headers); err != nil {
			return fmt.Errorf("scan: %w", err)
		}
		items = append(items, it)
	}
	if rows.Err() != nil {
		return fmt.Errorf("rows: %w", rows.Err())
	}

	if len(items) == 0 {
		return tx.Commit(ctx) // нечего делать
	}

	for _, it := range items {
		// Формируем ключ идемпотентности для потребителя
		msgID := messageID(it)

		// Публикация в Redis Streams
		_, pubErr := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: streamName,
			Values: map[string]any{
				"topic":          it.Topic,
				"aggregate_type": it.AggregateType,
				"aggregate_id":   it.AggregateID,
				"payload":        it.Payload,
				"headers":        it.Headers,
				"event_id":       msgID,
			},
		}).Result()

		if pubErr != nil {
			// запланируем повтор с экспоненциальной паузой
			if _, err := tx.Exec(ctx, `
				UPDATE outbox
				SET attempts = attempts + 1,
				    status = 'failed',
				    next_attempt_at = now() + $1::interval,
				    error = left($2, 4000)
				WHERE id = $3
			`, durationToInterval(nextDelayIncrement(tx, ctx, it.ID)), pubErr.Error(), it.ID); err != nil {
				return fmt.Errorf("update retry: %w", err)
			}
			continue
		}

		// успешная публикация — удаляем запись из outbox
		if _, err := tx.Exec(ctx, `DELETE FROM outbox WHERE id = $1`, it.ID); err != nil {
			return fmt.Errorf("delete: %w", err)
		}
	}

	return tx.Commit(ctx)
}

func messageID(it OutboxItem) string {
	h := sha256.New()
	h.Write([]byte(fmt.Sprintf("%s|%s|%s|%s", it.Topic, it.AggregateType, it.AggregateID, it.Payload)))
	return hex.EncodeToString(h.Sum(nil))
}

// durationToInterval конвертирует duration в строку интервала Postgres, например '5 seconds'
func durationToInterval(d time.Duration) string {
	secs := int64(d.Seconds())
	if secs < 1 {
		secs = 1
	}
	return fmt.Sprintf("%d seconds", secs)
}

// nextDelayIncrement вычисляет следующее значение backoff на стороне базы, читая attempts
func nextDelayIncrement(tx interface{ QueryRow(context.Context, string, any) }, ctx context.Context, id int64) time.Duration {
	var attempts int
	if err := tx.QueryRow(ctx, `SELECT attempts FROM outbox WHERE id = $1`, id).Scan(&attempts); err != nil {
		return baseRetryDelay
	}
	d := baseRetryDelay * (1 << attempts)
	if d > maxRetryDelay {
		return maxRetryDelay
	}
	return d
}

Примечания по алгоритму:

  • Используем FOR UPDATE SKIP LOCKED, чтобы параллельные ретрансляторы не мешали друг другу (нужен PostgreSQL 9.5+).
  • Публикация идёт до удаления записи. Если процесс упадёт после публикации, но до удаления, запись останется и будет отправлена повторно. Это ожидаемо: потребители должны уметь дедуплицировать по event_id (inbox-паттерн на их стороне).
  • Для тяжёлых тем можно разделять outbox по topic (разные партиции/таблицы) или запускать ретрансляторы с фильтрами по topic.

Очереди неудачных сообщений и ручной разбор

После N попыток можно отправлять событие в «мёртвую очередь» (DLQ) для ручного анализа:

ALTER TABLE outbox ADD COLUMN dlq BOOLEAN NOT NULL DEFAULT false;

-- пример правила: после 10 попыток считаем событие безнадёжным
UPDATE outbox
SET dlq = true, status = 'failed'
WHERE attempts >= 10 AND status = 'failed';

Масштабирование, мониторинг и отладка

Горизонтальное масштабирование

  • Запускайте несколько экземпляров ретранслятора: FOR UPDATE SKIP LOCKED обеспечивает разделение порций.
  • Следите за размером batch и частотой тикера: слишком маленькие — потеря пропускной способности, слишком большие — длинные транзакции и рост задержек.
  • Для гарантированного порядка по одному агрегату (например, один заказ) публикуйте события строго в одном потоке (shard) по хешу aggregate_id.

Мониторинг

  • Метрики: глубина outbox (кол-во pending/failed), время жизни записи в pending, доля ошибок публикации, p95/p99 задержка между created_at и публикацией.
  • Алерты: рост failed, превышение порога задержки, разрастание таблицы.
  • Логи: включайте event_id, aggregate_id, topic, для трассировки — trace_id.

Обслуживание

  • Регулярная очистка «старых» записей со статусом done (если вы их оставляете для аудита). С партициями — просто DROP PARTITION.
  • VACUUM и автонастройка (автовакуум) — outbox «горячая» таблица, помогите планировщику: разумные fillfactor, частые малые транзакции вместо редких больших.

Альтернативы: CDC, LISTEN/NOTIFY и почему не 2PC

  • CDC (Change Data Capture) через логическую репликацию/ Debezium: ретранслятор читает поток изменений таблиц и сам формирует события. Плюс — нет опроса и лишних индексов, высокая производительность. Минус — сложнее инфраструктура, сложнее тестировать локально, добавляется ещё одно звено.
  • LISTEN/NOTIFY: база «будит» ретранслятор при появлении новой записи. Хорошо снижает холостой опрос, но сам по себе не гарантирует доставку — это лишь сигнал, всё равно нужен проверочный опрос таблицы.
  • 2PC (двухфазная фиксация) с XA: теоретически даёт атомарность между БД и брокером, но дорого, сложно и часто недоступно на нужных компонентах. На практике Outbox + повторы + идемпотентные потребители надёжнее и проще.

Чек-лист внедрения

  • Добавьте таблицу outbox с индексами и триггером updated_at.
  • В каждом месте изменения бизнес-данных — записывайте событие в outbox в той же транзакции.
  • В событие включайте стабильный event_id для дедупликации у потребителя.
  • Поднимите ретранслятор(ы) с FOR UPDATE SKIP LOCKED, повторами и backoff.
  • Настройте мониторинг глубины очереди и доли ошибок.
  • Продумайте DLQ и регламент ручного разбора.
  • Проведите нагрузочные тесты: измерьте задержки и пропускную способность.
  • Опишите контракт событий: как эволюционирует схема payload и headers.

Частые ошибки

  • «Ровно-однажды» без дедупликации у потребителей. Итог — двойные списания и инциденты.
  • Крупные транзакции ретранслятора на тысячи записей. Итог — блокировки, длинные стоп-меры VACUUM, рост latency.
  • Отсутствие индекса по (status, next_attempt_at) — резкий рост CPU и задержек выборки.
  • Удаление записей сразу после публикации без аудит-трейла, а потом попытки дебага «по логам». Держите хотя бы сутки (или сохраняйте в отдельный архив/партицию).
  • Смешивание разнотипных тем в один «горячий» поток без шардирования — нарушение порядка и скачки задержек.

Экономика и выгода для бизнеса

  • Снижение инцидентов и ручных разборов: события не теряются, а дубли контролируемы.
  • Быстрая локальная отладка: всё крутится вокруг БД, не нужны тяжёлые распределённые транзакции.
  • Прозрачная эксплуатация: есть метрики, чёткий DLQ, понятная схема повторов.
  • Масштабирование по мере роста нагрузки — добавили ретрансляторы, расширили партиции.

Итоги

Паттерн Outbox превращает тонкое место «БД ↔ брокер» из источника случайных потерь в надёжный и наблюдаемый конвейер. Вы сохраняете атомарность на стороне базы, а публикацию в брокер делаете устойчивой к сбоям за счёт повторов и дедупликации у потребителей. Это проще, чем 2PC, и лучше соответствует реальным требованиям бизнеса: никаких потерянных заказов, предсказуемые интеграции и меньше ночных звонков в поддержку.


postgresqloutboxсобытия