
Типичная схема: сервис принимает заказ, пишет его в базу, затем публикует событие в очередь/шину (Kafka, RabbitMQ) или шлёт вебхук партнёру. Между этими шагами — пропасть. Любая из сторон может «споткнуться»:
В итоге бизнес теряет заказы и деньги, поддержка ночует в логах, а SLA трещит.
Outbox‑паттерн устраняет этот «разрыв» между БД и очередью и даёт гарантированную доставку «по крайней мере один раз» при корректной настройке дедупликации и идемпотентности на приёмнике.
Идея проста:
Так мы:
Есть два популярных варианта, как из outbox попадать дальше:
Ниже — рабочая реализация с воркером и пример конфигурации Debezium.
Создадим таблицу заказов и таблицу outbox. В outbox добавим ключ дедупликации, тайм‑штампы, тип события и полезную нагрузку.
-- Заказы
CREATE TABLE IF NOT EXISTS orders (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
amount NUMERIC(12,2) NOT NULL,
status TEXT NOT NULL DEFAULT 'created',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Outbox: журнал исходящих событий
CREATE TABLE IF NOT EXISTS outbox_events (
id BIGSERIAL PRIMARY KEY,
dedup_key TEXT NOT NULL, -- для дедупликации у потребителей
aggregate_type TEXT NOT NULL, -- например, 'order'
aggregate_id UUID NOT NULL, -- id заказа
event_type TEXT NOT NULL, -- например, 'order.created'
payload JSONB NOT NULL, -- данные события
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ, -- когда успешно доставили
attempts INTEGER NOT NULL DEFAULT 0, -- количество попыток
next_try_at TIMESTAMPTZ, -- когда пробовать снова
error TEXT -- последняя ошибка (для диагностики)
);
-- быстрый выбор «ещё не доставленных»
CREATE INDEX IF NOT EXISTS idx_outbox_unprocessed
ON outbox_events (processed_at, next_try_at NULLS FIRST)
WHERE processed_at IS NULL;
-- уникальность ключа дедупликации (опционально, если генерируете его на стороне приложения)
CREATE UNIQUE INDEX IF NOT EXISTS uq_outbox_dedup
ON outbox_events (dedup_key);
Рекомендуется выставить параметры автовакуума для outbox, если поток событий большой.
Важный момент: и запись заказа, и запись в outbox должны происходить в ОДНОЙ транзакции.
Пример на Go (используем стандартный database/sql и драйвер pgx):
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/google/uuid"
)
type Order struct {
ID uuid.UUID
UserID uuid.UUID
Amount float64
}
type OutboxEvent struct {
DedupKey string
AggregateType string
AggregateID uuid.UUID
EventType string
Payload map[string]any
}
func main() {
dsn := env("DATABASE_URL", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
db, err := sql.Open("pgx", dsn)
if err != nil { log.Fatal(err) }
defer db.Close()
ctx := context.Background()
order := Order{ID: uuid.New(), UserID: uuid.New(), Amount: 1490.0}
if err := createOrderWithEvent(ctx, db, order); err != nil {
log.Fatalf("failed to create order: %v", err)
}
fmt.Println("Заказ создан и событие поставлено в outbox")
}
func createOrderWithEvent(ctx context.Context, db *sql.DB, o Order) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil { return err }
defer func() {
if err != nil { _ = tx.Rollback() } else { _ = tx.Commit() }
}()
_, err = tx.ExecContext(ctx,
`INSERT INTO orders (id, user_id, amount, status) VALUES ($1,$2,$3,'created')`,
o.ID, o.UserID, o.Amount,
)
if err != nil { return err }
payload := map[string]any{
"order_id": o.ID.String(),
"user_id": o.UserID.String(),
"amount": o.Amount,
"status": "created",
"ts": time.Now().UTC().Format(time.RFC3339Nano),
}
ev := OutboxEvent{
DedupKey: fmt.Sprintf("order.created:%s", o.ID),
AggregateType: "order",
AggregateID: o.ID,
EventType: "order.created",
Payload: payload,
}
b, _ := json.Marshal(ev.Payload)
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (dedup_key, aggregate_type, aggregate_id, event_type, payload, next_try_at)
VALUES ($1,$2,$3,$4,$5, now())
ON CONFLICT (dedup_key) DO NOTHING
`, ev.DedupKey, ev.AggregateType, ev.AggregateID, ev.EventType, string(b))
return err
}
func env(key, def string) string {
if v := os.Getenv(key); v != "" { return v }
return def
}
Тут мы используем dedup_key вида «тип
» — удобно для downstream‑идемпотентности.Воркер регулярно выбирает неотправленные события, «захватывает» их с помощью SKIP LOCKED, пытается доставить, при успехе помечает processed_at, при ошибке увеличивает attempts и откладывает повтор. В примере отправим событие HTTP POST‑ом (как суррогат внешней шины), но на практике тут будет продюсер Kafka/Rabbit или ваш шлюз вебхуков.
package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
)
type OutboxRow struct {
ID int64
DedupKey string
EventType string
Payload []byte
Attempts int
}
func main() {
dsn := env("DATABASE_URL", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
endpoint := env("DELIVERY_ENDPOINT", "http://localhost:8080/events")
db, err := sql.Open("pgx", dsn)
if err != nil { log.Fatal(err) }
defer db.Close()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if err := deliverBatch(context.Background(), db, endpoint, 100); err != nil {
log.Printf("deliver error: %v", err)
}
}
}
func deliverBatch(ctx context.Context, db *sql.DB, endpoint string, limit int) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil { return err }
defer func() {
if err != nil { _ = tx.Rollback() } else { _ = tx.Commit() }
}()
rows, err := tx.QueryContext(ctx, `
SELECT id, dedup_key, event_type, payload, attempts
FROM outbox_events
WHERE processed_at IS NULL AND (next_try_at IS NULL OR next_try_at <= now())
ORDER BY occurred_at
FOR UPDATE SKIP LOCKED
LIMIT $1
`, limit)
if err != nil { return err }
defer rows.Close()
var batch []OutboxRow
for rows.Next() {
var r OutboxRow
if err := rows.Scan(&r.ID, &r.DedupKey, &r.EventType, &r.Payload, &r.Attempts); err != nil {
return err
}
batch = append(batch, r)
}
if len(batch) == 0 { return nil }
for _, r := range batch {
// Пытаемся доставить
if err2 := postJSON(endpoint, r.Payload, r.DedupKey, r.EventType); err2 != nil {
// Обновляем попытки и next_try_at (экспоненциальная задержка с «потолком»)
backoff := time.Duration(minInt(60, 1<<minInt(6, r.Attempts))) * time.Second
if _, err := tx.ExecContext(ctx, `
UPDATE outbox_events
SET attempts = attempts + 1,
next_try_at = now() + $1::interval,
error = $2
WHERE id = $3
`, fmt.Sprintf("%d seconds", int(backoff.Seconds())), truncateErr(err2), r.ID); err != nil {
return err
}
continue
}
if _, err := tx.ExecContext(ctx, `
UPDATE outbox_events
SET processed_at = now(), error = NULL
WHERE id = $1
`, r.ID); err != nil {
return err
}
}
return nil
}
func postJSON(url string, payload []byte, dedupKey, eventType string) error {
req, _ := http.NewRequest("POST", url, bytes.NewReader(payload))
req.Header.Set("Content-Type", "application/json")
// Передадим ключ для идемпотентности на стороне приёмника
req.Header.Set("Idempotency-Key", dedupKey)
req.Header.Set("X-Event-Type", eventType)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
if err != nil { return err }
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
return fmt.Errorf("bad status: %s", resp.Status)
}
func env(key, def string) string {
if v := os.Getenv(key); v != "" { return v }
return def
}
func minInt(a, b int) int { if a < b { return a } ; return b }
func truncateErr(err error) string {
s := err.Error()
if len(s) > 400 { return s[:400] }
return s
}
Ключевые моменты:
Outbox даёт доставку «как минимум один раз». Дубликаты возможны: воркер отправил, но упал до отметки processed_at; приёмник ответил 200, но сеть разорвалась на обратном пути; CDC дважды отдал одно и то же из‑за ребаланса. Значит, приёмник должен быть идемпотентным.
Как сделать просто:
Порядок важен? Если да — группируйте события по aggregate_id и публикуйте их строго по occurred_at. Воркер может выбирать по aggregate_id и обрабатывать последовательно, но это снижает параллелизм. Компромисс — гарантировать порядок «внутри одного агрегата».
Минимальный набор метрик:
Алерты:
Логи воркера делайте структурированными (JSON) с полями dedup_key, event_type, attempts.
Outbox со временем растёт. Нужна политика:
Простой джоб на очистку:
DELETE FROM outbox_events
WHERE processed_at IS NOT NULL
AND processed_at < now() - interval '30 days';
Если событий очень много, удаляйте порциями по первичному ключу и используйте автовакуум/плановую вакуумизацию.
Транзакционный outbox + воркер:
CDC через Debezium:
Пример конфигурации коннектора Debezium для PostgreSQL (Kafka Connect REST):
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "app",
"slot.name": "debezium_slot",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.outbox_events",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "event_type:header:eventType,dedup_key:header:idempotencyKey",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.route.topic.replacement": "events.${routedByValue}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
С такой настройкой события из outbox будут попадать в топики вида events.order.created.
До outbox:
После outbox + идемпотентность:
Перевод на язык денег: если средний чек 1 500 ₽, а потеря 0,3% от 100 000 заказов в месяц — это 450 000 ₽ прямых потерь. Outbox окупает внедрение за 1–2 недели.
Вывод: outbox — это простой технический приём, который закрывает один из самых дорогих для бизнеса классов дефектов — потерю событий при интеграциях. Внедрить его можно за 1–3 дня, начать — с воркера на PostgreSQL, а при росте — перейти на CDC. Результат — меньше инцидентов, предсказуемая доставка и спокойный сон команды.