
Если приложение пишет изменения в базу, а затем отправляет уведомление в брокер (Kafka, RabbitMQ, Redis Streams), есть риск: база зафиксировала заказ, а событие в очередь не ушло — или наоборот. Для бизнеса это выглядит как «потерянные заказы», «двойные списания» или «партнёр не получил наш вебхук, но деньги мы взяли». Чем сложнее интеграции, тем дороже такой рассинхрон.
Транзакционный аутбокс и CDC (Change Data Capture — поток изменений из базы) позволяют гарантировать: если запись в базе есть, то событие об этом изменении обязательно будет доставлено в шину — и наоборот, без дублей и пропусков.
Классическая анти‑схема:
Проблемы:
Любой из этих кейсов приводит к тому, что разные системы видят разное состояние.
Идея простая:
Это даёт «хотя бы один раз» доставку в брокер с атомарностью относительно базы. Дубли при доставке наружу возможны, поэтому потребители должны быть идемпотентными — это норма для событийной архитектуры.
Есть несколько вариантов:
Поллер в приложении.
Триггеры/функции в БД.
CDC (Debezium и аналоги) — читает журнал транзакций БД и транслирует изменения во внешний брокер.
На практике для средних и крупных систем CDC — наилучший баланс надёжности и операционных затрат.
Рекомендуемая таблица в PostgreSQL:
CREATE TABLE outbox_events (
id uuid PRIMARY KEY,
aggregate_type text NOT NULL, -- например, 'order'
aggregate_id text NOT NULL, -- ID заказа
type text NOT NULL, -- тип события, напр. 'order.created'
payload jsonb NOT NULL, -- минимальный набор данных
headers jsonb NOT NULL DEFAULT '{}'::jsonb, -- метаданные: версия схемы, trace_id
partition_key text NOT NULL, -- ключ партиционирования для порядка, обычно aggregate_id
occurred_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_outbox_occurred ON outbox_events(occurred_at);
CREATE INDEX idx_outbox_partition ON outbox_events(partition_key);
Пример вставки «заказ + событие» в одной транзакции на Go (database/sql):
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"time"
_ "github.com/lib/pq"
"github.com/google/uuid"
)
type Order struct {
ID string
Amount int64
Currency string
}
type OutboxEvent struct {
ID string
AggregateType string
AggregateID string
Type string
Payload map[string]any
Headers map[string]any
PartitionKey string
}
func createOrder(ctx context.Context, db *sql.DB, o Order) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil { return err }
defer tx.Rollback()
// 1) Бизнес‑запись
if _, err := tx.ExecContext(ctx,
`INSERT INTO orders(id, amount, currency, created_at) VALUES ($1,$2,$3,$4)`,
o.ID, o.Amount, o.Currency, time.Now()); err != nil {
return err
}
// 2) Событие в аутбокс (та же транзакция)
evt := OutboxEvent{
ID: uuid.New().String(),
AggregateType: "order",
AggregateID: o.ID,
Type: "order.created",
Payload: map[string]any{
"order_id": o.ID,
"amount": o.Amount,
"currency": o.Currency,
"version": 1,
},
Headers: map[string]any{
"schema_version": 1,
},
PartitionKey: o.ID,
}
payload, _ := json.Marshal(evt.Payload)
headers, _ := json.Marshal(evt.Headers)
if _, err := tx.ExecContext(ctx,
`INSERT INTO outbox_events(id, aggregate_type, aggregate_id, type, payload, headers, partition_key)
VALUES ($1,$2,$3,$4,$5,$6,$7)`,
evt.ID, evt.AggregateType, evt.AggregateID, evt.Type, payload, headers, evt.PartitionKey,
); err != nil {
return err
}
return tx.Commit()
}
func main() {
db, err := sql.Open("postgres", "postgres://user:pass@localhost:5432/app?sslmode=disable")
if err != nil { panic(err) }
defer db.Close()
err = createOrder(context.Background(), db, Order{ID: uuid.New().String(), Amount: 9900, Currency: "RUB"})
if err != nil {
log.Fatal(err)
}
log.Println("order created and event saved to outbox")
}
Ключевой момент: событие сохраняется в той же транзакции, что и заказ. Либо всё, либо ничего.
Debezium следит за WAL‑журналом PostgreSQL и публикует изменения в Kafka. Для аутбокса удобно использовать готовое преобразование Outbox SMT — оно превращает вставку в outbox_events в «чистое» доменное событие.
Минимальный конфиг коннектора (JSON для Kafka Connect):
{
"name": "outbox-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "app",
"plugin.name": "pgoutput",
"table.include.list": "public.outbox_events",
"slot.name": "outbox_slot",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "headers:header,aggregate_type:header,aggregate_id:header,partition_key:header",
"transforms.outbox.route.by.field": "type",
"transforms.outbox.route.topic.replacement": "${r}.v1",
"transforms.outbox.payload.field.name": "payload",
"transforms.outbox.timestamp.field.name": "occurred_at",
"transforms.outbox.table.fields": "id,aggregate_type,aggregate_id,type,payload,headers,partition_key,occurred_at"
}
}
Что получится:
Даже с аутбоксом доставку стоит считать «хотя бы один раз». Потребитель обязан быть идемпотентным. Простой способ — хранить идентификаторы обработанных сообщений.
CREATE TABLE processed_messages (
message_id text PRIMARY KEY,
processed_at timestamptz NOT NULL DEFAULT now()
);
Пример на Go: «обработай, если ещё не обрабатывали» в транзакции.
func handleMessage(ctx context.Context, db *sql.DB, msgID string, payload []byte) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil { return err }
defer tx.Rollback()
// Пытаемся пометить сообщение как обработанное
if _, err := tx.ExecContext(ctx,
`INSERT INTO processed_messages(message_id) VALUES ($1) ON CONFLICT DO NOTHING`, msgID); err != nil {
return err
}
// Проверяем, действительно ли вставили (не было ли конфликта)
var cnt int
if err := tx.QueryRowContext(ctx, `SELECT 1 FROM processed_messages WHERE message_id=$1`, msgID).Scan(&cnt); err != nil {
return err
}
if cnt != 1 {
// Уже обрабатывали — выходим без побочных эффектов
return nil
}
// Делаем бизнес‑действие (например, обновляем статус заказа)
if _, err := tx.ExecContext(ctx, `UPDATE orders SET status='PAID' WHERE id=$1`, extractOrderID(payload)); err != nil {
return err
}
return tx.Commit()
}
Про порядок:
-- Пример декларативного партиционирования по дате
CREATE TABLE outbox_events (
id uuid NOT NULL,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
headers jsonb NOT NULL,
partition_key text NOT NULL,
occurred_at timestamptz NOT NULL
) PARTITION BY RANGE (occurred_at);
CREATE TABLE outbox_events_2026_02 PARTITION OF outbox_events
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
Транзакционный аутбокс и CDC устраняют «щель» между базой и брокером сообщений. Вы получаете атомарность на границе «БД ↔ очередь», предсказуемую доставку событий и упрощённую диагностику. Для бизнеса это означает меньше инцидентов «заказ есть — события нет», меньше ручной разборки с партнёрами и поддержку, которая работает проактивно, а не реагирует на пожары. Стоимость внедрения окупается уже при первых пиках нагрузки и росте числа интеграций.