
С каждым кварталом появляется ещё один внешний сервис: платёжные шлюзы, склад, курьеры, CRM, аналитика. Бизнес хочет видеть целостную картину и автоматические процессы. Проблема в том, что запись в вашу основную БД и отправка события во внешнюю систему — это две разные операции. Если одна из них упадёт, у вас появится рассинхрон: в БД заказ проведён, а в склад событие не пришло; или наоборот — событие ушло дважды.
Транзакционный Outbox решает эту проблему: мы записываем и бизнес‑данные, и событие в одну транзакцию в БД. Дальше отдельный надёжный процесс публикует событие во внешний брокер или шину. Результат — меньше инцидентов, предсказуемая интеграция, быстрее выводим новые каналы.
Типичный анти‑паттерн — «сделали UPDATE, потом отправили в брокер». Между этими шагами всё, что угодно, может пойти не так:
В обратную сторону появляются дубли:
Приложение выполняет бизнес‑операцию (например, подтверждает оплату заказа) и в той же транзакции добавляет запись в outbox со всеми атрибутами события.
Отдельный фоновой процесс (воркер) читает новые записи из outbox, публикует их в брокер (Kafka, RabbitMQ, SQS и т. д.) и помечает как опубликованные.
Если публикация не удалась — воркер увеличивает счётчик попыток, пишет ошибку и попробует позже, соблюдая backoff и лимиты.
Так мы добиваемся атомарности между бизнес‑данными и фактом появления события в очереди публикаций.
Ниже — минимальная, но пригодная к продакшену схема. Используем частичный индекс для быстрых выборок необработанных событий и типизированные поля для маршрутизации.
-- Расширение для генерации UUID (можно заменить генерацией на стороне приложения)
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Таблица бизнес-домена (пример: заказы)
CREATE TABLE IF NOT EXISTS orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('created','paid','shipped','canceled')),
total_amount NUMERIC(12,2) NOT NULL CHECK (total_amount >= 0),
paid_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS orders_status_idx ON orders(status);
-- Таблица outbox
CREATE TABLE IF NOT EXISTS outbox (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type TEXT NOT NULL, -- например, 'order'
aggregate_id TEXT NOT NULL, -- строковый идентификатор агрегата
event_type TEXT NOT NULL, -- например, 'order.paid.v1'
payload JSONB NOT NULL, -- полезная нагрузка события
headers JSONB NOT NULL DEFAULT '{}'::jsonb, -- метаданные, трассировка, корреляция
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ,
attempts INT NOT NULL DEFAULT 0,
error TEXT
);
-- Индекс для быстрого чтения непубликованных событий по времени
CREATE INDEX IF NOT EXISTS outbox_unpublished_idx
ON outbox (created_at)
WHERE published_at IS NULL;
-- Триггер: при переводе заказа в paid пишем событие в outbox
CREATE OR REPLACE FUNCTION trg_order_paid_to_outbox()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'UPDATE' AND NEW.status = 'paid' AND OLD.status <> 'paid' THEN
INSERT INTO outbox(aggregate_type, aggregate_id, event_type, payload, headers)
VALUES (
'order',
NEW.id::text,
'order.paid.v1',
jsonb_build_object(
'orderId', NEW.id,
'userId', NEW.user_id,
'total', NEW.total_amount,
'paidAt', COALESCE(NEW.paid_at, now())
),
jsonb_build_object(
'traceId', gen_random_uuid()::text,
'source', 'orders-service'
)
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS order_paid_to_outbox ON orders;
CREATE TRIGGER order_paid_to_outbox
AFTER UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION trg_order_paid_to_outbox();
Так мы гарантируем: если статус заказа стал paid, то событие появится в outbox в той же транзакции. Если транзакция откатится — не изменится ни заказ, ни outbox.
Ниже — пример воркера на Go, который читает события батчами, публикует в Kafka и помечает как опубликованные. Используем SKIP LOCKED, чтобы несколько воркеров могли работать параллельно без гонок.
package main
import (
context "context"
"database/sql"
"errors"
"log"
"os"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/segmentio/kafka-go"
)
type Event struct {
EventID string
AggregateType string
AggregateID string
EventType string
Payload []byte
Headers []byte
}
type Publisher interface {
Publish(ctx context.Context, topic, key string, value []byte, headers map[string]string) error
}
type KafkaPublisher struct {
Writer *kafka.Writer
}
func (p *KafkaPublisher) Publish(ctx context.Context, topic, key string, value []byte, headers map[string]string) error {
var kh []kafka.Header
for k, v := range headers {
kh = append(kh, kafka.Header{Key: k, Value: []byte(v)})
}
msg := kafka.Message{Key: []byte(key), Value: value, Headers: kh, Time: time.Now()}
return p.Writer.WriteMessages(ctx, msg)
}
func main() {
dsn := envOr("PG_DSN", "postgres://user:pass@localhost:5432/app?sslmode=disable")
broker := envOr("KAFKA_BROKER", "localhost:9092")
topic := envOr("KAFKA_TOPIC", "orders-events")
batchSize := 100
pollInterval := 200 * time.Millisecond
db, err := sql.Open("pgx", dsn)
if err != nil { log.Fatal(err) }
defer db.Close()
pub := &KafkaPublisher{Writer: &kafka.Writer{
Addr: kafka.TCP(broker),
Topic: topic,
RequiredAcks: kafka.RequireAll,
BatchTimeout: 10 * time.Millisecond,
}}
defer pub.Writer.Close()
ctx := context.Background()
for {
processed := processBatch(ctx, db, pub, topic, batchSize)
if processed == 0 { time.Sleep(pollInterval) }
}
}
func processBatch(ctx context.Context, db *sql.DB, pub Publisher, topic string, limit int) int {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil { log.Printf("begin tx: %v", err); return 0 }
defer func() {
if err != nil { _ = tx.Rollback() } else { _ = tx.Commit() }
}()
rows, err := tx.QueryContext(ctx, `
SELECT event_id, aggregate_type, aggregate_id, event_type, payload, headers
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT $1`, limit)
if err != nil { log.Printf("select: %v", err); return 0 }
defer rows.Close()
var events []Event
for rows.Next() {
var e Event
if err = rows.Scan(&e.EventID, &e.AggregateType, &e.AggregateID, &e.EventType, &e.Payload, &e.Headers); err != nil {
log.Printf("scan: %v", err)
return 0
}
events = append(events, e)
}
if err = rows.Err(); err != nil { log.Printf("rows: %v", err); return 0 }
processed := 0
for _, e := range events {
key := e.AggregateID // фиксируем порядок в партиции по ключу
headers := map[string]string{"event_id": e.EventID, "event_type": e.EventType}
if pubErr := pub.Publish(ctx, topic, key, e.Payload, headers); pubErr != nil {
log.Printf("publish %s: %v", e.EventID, pubErr)
// фиксируем ошибку и увеличиваем attempts, чтобы не зациклиться безумно часто
if _, err = tx.ExecContext(ctx, `
UPDATE outbox SET attempts = attempts + 1, error = $2 WHERE event_id = $1`, e.EventID, truncateErr(pubErr)); err != nil {
log.Printf("mark error: %v", err)
}
continue
}
// успешная публикация — помечаем
if _, err = tx.ExecContext(ctx, `
UPDATE outbox SET published_at = now(), attempts = attempts + 1, error = NULL WHERE event_id = $1`, e.EventID); err != nil {
log.Printf("mark published: %v", err)
continue
}
processed++
}
return processed
}
func truncateErr(err error) string {
s := err.Error()
if len(s) > 500 { s = s[:500] }
return s
}
func envOr(k, def string) string {
if v := os.Getenv(k); v != "" { return v }
return def
}
Ключевые моменты:
Пуллинг хорош простотой, но если у вас много событий или нужны минимальные задержки, лучше читать поток изменений БД (CDC). Debezium слушает лог репликации PostgreSQL и публикует события в Kafka.
Пример конфигурации коннектора Debezium для outbox‑таблицы:
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replica",
"database.password": "secret",
"database.dbname": "app",
"database.server.name": "app-pg",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.outbox",
"heartbeat.interval.ms": "5000",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "headers:header,aggregate_type:header,aggregate_id:header,event_type:header",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.route.topic.replacement": "${r}.events",
"transforms.outbox.table.field.event.id": "event_id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.timestamp": "created_at"
}
}
Debezium сам превратит строки outbox в сообщения в Kafka. Воркеры на вашей стороне не нужны, а задержка — миллисекунды. Минусы: требуется доступ к логам репликации, отдельный кластер Kafka/Connect и DevOps‑экспертиза.
Следите за:
Держите DLQ (dead‑letter queue) для сообщений, которые стабильно не отдаются (например, схема сломалась). Делайте инструмент для перепубликации после фикса.
Такой подход снижает риск и даёт время проверить метрики лога/лаги без влияния на бизнес.
Транзакционный Outbox и CDC — практичный способ сделать интеграции предсказуемыми и надёжными без дорогих распределённых транзакций. Вы закрываете один из самых частых классов инцидентов — потерю или дублирование событий — и одновременно ускоряете выпуск новых интеграций. Бизнес получает стабильные процессы, поддержка — меньше «ручных починок», разработка — понятную и тестируемую архитектуру.