
Простой пример: пользователь оформляет заказ. Сервис создаёт запись в БД и должен отправить событие «order.created» в Kafka, чтобы склады, биллинг и аналитика среагировали. Если отправлять событие сразу после вставки в БД, возникают две проблемы:
Оба сценария приводят к инцидентам: недопоставка, неверные начисления, каскадные ошибки, ручные разборы. Outbox‑паттерн решает это предсказуемо и дёшево:
В итоге: меньше инцидентов, меньше ручных «распутываний», быстрее онбординг новых интеграций.
Идея проста:
Ключ: запись в outbox и изменение бизнес‑состояния происходят атомарно в одной транзакции. Значит, либо мы имеем и заказ, и событие, либо ничего; «рассинхрона» нет.
Минимальный набор полей: тип агрегата, идентификатор, тип события, полезная нагрузка, ключ дедупликации, время создания, отметка публикации и счётчик попыток.
-- Таблица Outbox для PostgreSQL
CREATE TABLE IF NOT EXISTS outbox_events (
id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- например, 'order'
aggregate_id TEXT NOT NULL, -- например, ID заказа
event_type TEXT NOT NULL, -- например, 'order.created'
payload JSONB NOT NULL, -- данные события
headers JSONB NOT NULL DEFAULT '{}', -- метаданные: трассировка, источник
dedup_key TEXT NOT NULL, -- ключ для снятия дублей на потребителе
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ NULL, -- момент успешной публикации
attempt SMALLINT NOT NULL DEFAULT 0
);
-- Индексы для быстрого чтения непубликованных событий и для чистки
CREATE INDEX IF NOT EXISTS idx_outbox_pending
ON outbox_events (published_at NULLS FIRST, created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_aggregate
ON outbox_events (aggregate_type, aggregate_id);
-- Ограничение уникальности по dedup_key (опционально, если генерируете устойчивый ключ)
CREATE UNIQUE INDEX IF NOT EXISTS ux_outbox_dedup
ON outbox_events (dedup_key);
Комментарии:
Ниже — короткий пример на Go: создаём заказ и атомарно пишем событие в outbox. Код полностью рабочий: он создаёт таблицы (если их нет), добавляет заказ и событие, затем запускает воркер, который «публикует» событие (в примере — в лог). Вместо лога подключите клиент вашего брокера.
package main
import (
context "context"
"database/sql"
"encoding/json"
"fmt"
"log"
"math"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/lib/pq"
"github.com/google/uuid"
)
// Сообщение для публикации
type OutboxMessage struct {
ID string `json:"id"`
AggregateType string `json:"aggregate_type"`
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
Payload json.RawMessage `json:"payload"`
Headers json.RawMessage `json:"headers"`
DedupKey string `json:"dedup_key"`
CreatedAt time.Time `json:"created_at"`
}
func main() {
// Подключение к PostgreSQL
dsn := envOr("PG_DSN", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
db, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
if err := initSchema(db); err != nil {
log.Fatal(err)
}
// Вставим тестовый заказ и событие
orderID := uuid.New().String()
if err := createOrderWithEvent(db, orderID); err != nil {
log.Fatal(err)
}
log.Println("Заказ создан и событие записано в outbox:", orderID)
// Запустим воркер публикации
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pub := func(ctx context.Context, m OutboxMessage) error {
// Здесь используйте клиент Kafka/Rabbit/NATS. В примере — лог.
log.Printf("Публикация события: id=%s type=%s agg=%s/%s\n", m.ID, m.EventType, m.AggregateType, m.AggregateID)
return nil
}
cfg := RelayConfig{BatchSize: 100, BaseBackoff: time.Second, MaxBackoff: 30 * time.Second}
go startRelay(ctx, db, pub, cfg)
<-ctx.Done()
log.Println("Остановка…")
}
func envOr(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}
func initSchema(db *sql.DB) error {
sqlStmt := `
CREATE TABLE IF NOT EXISTS orders (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS outbox_events (
id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL DEFAULT '{}',
dedup_key TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ NULL,
attempt SMALLINT NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_outbox_pending
ON outbox_events (published_at NULLS FIRST, created_at);
CREATE UNIQUE INDEX IF NOT EXISTS ux_outbox_dedup
ON outbox_events (dedup_key);
`
_, err := db.Exec(sqlStmt)
return err
}
func createOrderWithEvent(db *sql.DB, orderID string) error {
tx, err := db.Begin()
if err != nil { return err }
defer func() { _ = tx.Rollback() }()
if _, err := tx.Exec(`INSERT INTO orders(id, status) VALUES($1, 'created')`, orderID); err != nil {
return err
}
payload := map[string]any{"order_id": orderID, "status": "created"}
payloadBytes, _ := json.Marshal(payload)
headers := map[string]any{"source": "orders-service"}
headersBytes, _ := json.Marshal(headers)
outboxID := uuid.New()
dedup := orderID + ":v1" // стабильный ключ для снятия дублей
_, err = tx.Exec(`
INSERT INTO outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, dedup_key)
VALUES ($1, 'order', $2, 'order.created', $3, $4, $5)
ON CONFLICT (dedup_key) DO NOTHING
`, outboxID, orderID, payloadBytes, headersBytes, dedup)
if err != nil {
return err
}
return tx.Commit()
}
// Конфиг воркера
type RelayConfig struct {
BatchSize int
BaseBackoff time.Duration
MaxBackoff time.Duration
}
// startRelay — ретранслятор: читает неподтверждённые записи, публикует и помечает как опубликованные
func startRelay(ctx context.Context, db *sql.DB, publish func(context.Context, OutboxMessage) error, cfg RelayConfig) {
backoff := cfg.BaseBackoff
for {
select {
case <-ctx.Done():
return
default:
}
msgs, err := lockAndLoadBatch(ctx, db, cfg.BatchSize)
if err != nil {
log.Println("ошибка загрузки батча:", err)
time.Sleep(backoff)
backoff = minDuration(cfg.MaxBackoff, time.Duration(float64(backoff)*1.5))
continue
}
backoff = cfg.BaseBackoff
if len(msgs) == 0 {
time.Sleep(300 * time.Millisecond)
continue
}
for _, m := range msgs {
if err := publish(ctx, m); err != nil {
log.Printf("публикация не удалась (id=%s): %v\n", m.ID, err)
markAttempt(ctx, db, m.ID)
continue
}
markPublished(ctx, db, m.ID)
}
}
}
func lockAndLoadBatch(ctx context.Context, db *sql.DB, n int) ([]OutboxMessage, error) {
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil { return nil, err }
defer func() { _ = tx.Rollback() }()
rows, err := tx.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload, headers, dedup_key, created_at
FROM outbox_events
WHERE published_at IS NULL
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT $1
`, n)
if err != nil { return nil, err }
defer rows.Close()
var res []OutboxMessage
for rows.Next() {
var m OutboxMessage
if err := rows.Scan(&m.ID, &m.AggregateType, &m.AggregateID, &m.EventType, &m.Payload, &m.Headers, &m.DedupKey, &m.CreatedAt); err != nil {
return nil, err
}
res = append(res, m)
}
if err := rows.Err(); err != nil { return nil, err }
if err := tx.Commit(); err != nil { return nil, err }
return res, nil
}
func markPublished(ctx context.Context, db *sql.DB, id string) {
_, err := db.ExecContext(ctx, `UPDATE outbox_events SET published_at = now() WHERE id = $1`, id)
if err != nil { log.Println("ошибка отметки published:", err) }
}
func markAttempt(ctx context.Context, db *sql.DB, id string) {
_, err := db.ExecContext(ctx, `UPDATE outbox_events SET attempt = attempt + 1 WHERE id = $1`, id)
if err != nil { log.Println("ошибка увеличения attempt:", err) }
}
func minDuration(a, b time.Duration) time.Duration { if a < b { return a }; return b }
Что важно в этом примере:
Рекомендации к продакшн‑воркеру:
Событие может быть опубликовано больше одного раза (например, воркер не получил подтверждение от брокера). Потребителю нужна идемпотентная обработка. Три простых способа:
-- У потребителя
CREATE TABLE IF NOT EXISTS processed_events (
dedup_key TEXT PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- При обработке события
INSERT INTO processed_events(dedup_key) VALUES ($1) ON CONFLICT DO NOTHING;
-- Далее делать бизнес‑логику только если новая строка действительно вставлена
Версионирование и оптимистические блокировки: храните версию агрегата и применяйте событие только если версия больше текущей.
Временной кэш ключей: храните dedup_key в Redis с TTL, если обработка чисто реактивная и вы не изменяете хранение состояния.
Пример чистки партиями:
-- Удаляем старые опубликованные записи пачками по 10k
DELETE FROM outbox_events
WHERE published_at IS NOT NULL AND published_at < now() - INTERVAL '30 days'
LIMIT 10000;
В PostgreSQL до версии 13 LIMIT в DELETE не поддерживался, используйте CTE с ORDER BY и PK, либо партиции. В новых версиях можно добавить USING и фильтры по партициям.
Какие метрики держать под рукой:
Алерты:
Логи и трассировка:
Заключение: Outbox‑паттерн — это небольшой слой поверх вашей БД, который радикально снижает риск потерь и дублей при интеграциях через события. Он прост, предсказуем, хорошо масштабируется и окупается уже на первом предотвращённом инциденте с платежами или логистикой.