
Типичный микросервис делает две вещи: записывает бизнес-данные в базу и отправляет событие в брокер (Kafka/Rabbit/Redis Streams), чтобы другие сервисы синхронизировались. Если делать это раздельными вызовами, начинаются гонки и потери:
Двухфазная фиксация (2PC) между БД и брокером в реальных системах редко возможна или экономически оправдана: разная поддержка протоколов, блокировки и высокая сложность. Нужен способ «сшить» запись в базу и публикацию события без тяжёлого распределённого транзакционого протокола.
Решение — паттерн Outbox («журнал исходящих»): мы пишем бизнес-данные и событие в одну и ту же базу в рамках одной транзакции, а отдельный процесс безопасно и с повторами публикует эти события в брокер. Так мы гарантируем, что событие не потеряется, а публикация всегда соответствует записанным данным.
Важно: ретранслятор должен быть устойчив к падениям и дублированию. «Ровно-однажды» в распределённых системах — миф. Стандартная стратегия: допускаем повторную публикацию и обеспечиваем идемпотентность на стороне потребителя (inbox у потребителей, ключ дедупликации в событии).
Ниже — минимально практичная схема с индексами, поддержкой повторов и дедупликации на стороне продьюсера.
-- Таблица исходящих событий
CREATE TABLE IF NOT EXISTS outbox (
id BIGSERIAL PRIMARY KEY,
topic TEXT NOT NULL, -- логическая «тема»/канал события
aggregate_type TEXT NOT NULL, -- тип сущности (например, order)
aggregate_id TEXT NOT NULL, -- идентификатор сущности
payload JSONB NOT NULL, -- тело события
headers JSONB NOT NULL DEFAULT '{}'::jsonb, -- метаданные: ключ дедупликации, трассировка
dedup_key TEXT, -- опционально: уникальный ключ события, чтобы не породить дубль в outbox
status TEXT NOT NULL DEFAULT 'pending', -- pending|processing|done|failed
attempts INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT now(),
error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Уникальная дедупликация на уровне outbox (по желанию)
CREATE UNIQUE INDEX IF NOT EXISTS outbox_dedup_idx
ON outbox(dedup_key) WHERE dedup_key IS NOT NULL;
-- Быстрый выбор только «созревших» к отправке
CREATE INDEX IF NOT EXISTS outbox_pending_idx
ON outbox(status, next_attempt_at, created_at)
WHERE status IN ('pending', 'failed');
-- Триггер для updated_at
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS set_updated_at_trigger ON outbox;
CREATE TRIGGER set_updated_at_trigger
BEFORE UPDATE ON outbox
FOR EACH ROW EXECUTE FUNCTION set_updated_at();
Пример транзакции приложения: записываем заказ и добавляем событие.
BEGIN;
INSERT INTO orders(id, user_id, amount, status)
VALUES ('ord_1001', 'u42', 1250, 'created');
INSERT INTO outbox(topic, aggregate_type, aggregate_id, payload, headers, dedup_key)
VALUES (
'orders.created',
'order',
'ord_1001',
jsonb_build_object(
'order_id','ord_1001',
'user_id','u42',
'amount',1250,
'status','created'
),
jsonb_build_object(
'event_id', 'evt_2f2a8d7b-0b6f-4c13-9a6d-1a2f11b1a111',
'trace_id', 'trc_7b9a...'
),
'evt_2f2a8d7b-0b6f-4c13-9a6d-1a2f11b1a111'
);
COMMIT;
Партиционирование: при больших объёмах удобно делать range-партиции по дате created_at (например, месяц). Это ускорит очистку старых «done» и снизит давление на VACUUM.
-- Пример партиционирования по месяцам (опционально)
ALTER TABLE outbox PARTITION BY RANGE (created_at);
CREATE TABLE IF NOT EXISTS outbox_2026_02 PARTITION OF outbox
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
Ниже — рабочий пример ретранслятора на Go. Он:
FOR UPDATE SKIP LOCKED, чтобы несколько экземпляров могли работать параллельно, не мешая друг другу.attempts и откладывает следующий повтор (экспоненциальная пауза).Для простоты используем Docker Compose с PostgreSQL и Redis.
# docker-compose.yml
version: "3.8"
services:
postgres:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: app
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
pgdata:
Установим зависимости и запустим ретранслятор.
# 1) Запускаем инфраструктуру
docker compose up -d
# 2) Инициализируем схему (вставьте из раздела со схемой)
psql postgresql://postgres:postgres@localhost:5432/app -f schema.sql
# 3) Подготовим Go-проект
mkdir outbox-relay && cd outbox-relay
go mod init example.com/outbox-relay
go get github.com/jackc/pgx/v5 github.com/redis/go-redis/v9
# 4) Создаём main.go (см. ниже) и запускаем
go run .
Код ретранслятора:
package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
redis "github.com/redis/go-redis/v9"
)
const (
batchSize = 100
baseRetryDelay = 2 * time.Second
maxRetryDelay = 5 * time.Minute
streamName = "events"
claimLeaseSeconds = 60 // опционально: окно «обработки»
)
type OutboxItem struct {
ID int64
Topic string
AggregateType string
AggregateID string
Payload string
Headers string
}
func env(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
pgURL := env("PG_URL", "postgresql://postgres:postgres@localhost:5432/app")
rdbAddr := env("REDIS_ADDR", "localhost:6379")
pg, err := pgxpool.New(ctx, pgURL)
if err != nil {
log.Fatalf("pg connect: %v", err)
}
defer pg.Close()
rdb := redis.NewClient(&redis.Options{Addr: rdbAddr})
defer rdb.Close()
log.Println("outbox relay started")
Ticker := time.NewTicker(500 * time.Millisecond)
defer Ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("stopping...")
return
case <-Ticker.C:
if err := processBatch(ctx, pg, rdb); err != nil {
log.Printf("batch error: %v", err)
// короткая пауза, чтобы не крутить процессор при постоянной ошибке
time.Sleep(time.Second)
}
}
}
}
func nextDelay(attempts int) time.Duration {
d := baseRetryDelay * (1 << (attempts))
if d > maxRetryDelay {
return maxRetryDelay
}
return d
}
func processBatch(ctx context.Context, pg *pgxpool.Pool, rdb *redis.Client) error {
// Транзакция удерживает блокировки строк, чтобы несколько ретрансляторов не взяли один и тот же элемент
tx, err := pg.Begin(ctx)
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer func() {
_ = tx.Rollback(ctx) // безопасно, если уже коммитнуто
}()
rows, err := tx.Query(ctx, `
SELECT id, topic, aggregate_type, aggregate_id, payload::text, headers::text
FROM outbox
WHERE status IN ('pending','failed')
AND next_attempt_at <= now()
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
`, batchSize)
if err != nil {
return fmt.Errorf("select: %w", err)
}
defer rows.Close()
items := make([]OutboxItem, 0, batchSize)
for rows.Next() {
var it OutboxItem
if err := rows.Scan(&it.ID, &it.Topic, &it.AggregateType, &it.AggregateID, &it.Payload, &it.Headers); err != nil {
return fmt.Errorf("scan: %w", err)
}
items = append(items, it)
}
if rows.Err() != nil {
return fmt.Errorf("rows: %w", rows.Err())
}
if len(items) == 0 {
return tx.Commit(ctx) // нечего делать
}
for _, it := range items {
// Формируем ключ идемпотентности для потребителя
msgID := messageID(it)
// Публикация в Redis Streams
_, pubErr := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]any{
"topic": it.Topic,
"aggregate_type": it.AggregateType,
"aggregate_id": it.AggregateID,
"payload": it.Payload,
"headers": it.Headers,
"event_id": msgID,
},
}).Result()
if pubErr != nil {
// запланируем повтор с экспоненциальной паузой
if _, err := tx.Exec(ctx, `
UPDATE outbox
SET attempts = attempts + 1,
status = 'failed',
next_attempt_at = now() + $1::interval,
error = left($2, 4000)
WHERE id = $3
`, durationToInterval(nextDelayIncrement(tx, ctx, it.ID)), pubErr.Error(), it.ID); err != nil {
return fmt.Errorf("update retry: %w", err)
}
continue
}
// успешная публикация — удаляем запись из outbox
if _, err := tx.Exec(ctx, `DELETE FROM outbox WHERE id = $1`, it.ID); err != nil {
return fmt.Errorf("delete: %w", err)
}
}
return tx.Commit(ctx)
}
func messageID(it OutboxItem) string {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%s|%s|%s|%s", it.Topic, it.AggregateType, it.AggregateID, it.Payload)))
return hex.EncodeToString(h.Sum(nil))
}
// durationToInterval конвертирует duration в строку интервала Postgres, например '5 seconds'
func durationToInterval(d time.Duration) string {
secs := int64(d.Seconds())
if secs < 1 {
secs = 1
}
return fmt.Sprintf("%d seconds", secs)
}
// nextDelayIncrement вычисляет следующее значение backoff на стороне базы, читая attempts
func nextDelayIncrement(tx interface{ QueryRow(context.Context, string, any) }, ctx context.Context, id int64) time.Duration {
var attempts int
if err := tx.QueryRow(ctx, `SELECT attempts FROM outbox WHERE id = $1`, id).Scan(&attempts); err != nil {
return baseRetryDelay
}
d := baseRetryDelay * (1 << attempts)
if d > maxRetryDelay {
return maxRetryDelay
}
return d
}
Примечания по алгоритму:
FOR UPDATE SKIP LOCKED, чтобы параллельные ретрансляторы не мешали друг другу (нужен PostgreSQL 9.5+).event_id (inbox-паттерн на их стороне).topic (разные партиции/таблицы) или запускать ретрансляторы с фильтрами по topic.После N попыток можно отправлять событие в «мёртвую очередь» (DLQ) для ручного анализа:
ALTER TABLE outbox ADD COLUMN dlq BOOLEAN NOT NULL DEFAULT false;
-- пример правила: после 10 попыток считаем событие безнадёжным
UPDATE outbox
SET dlq = true, status = 'failed'
WHERE attempts >= 10 AND status = 'failed';
FOR UPDATE SKIP LOCKED обеспечивает разделение порций.aggregate_id.created_at и публикацией.failed, превышение порога задержки, разрастание таблицы.event_id, aggregate_id, topic, для трассировки — trace_id.done (если вы их оставляете для аудита). С партициями — просто DROP PARTITION.outbox с индексами и триггером updated_at.event_id для дедупликации у потребителя.FOR UPDATE SKIP LOCKED, повторами и backoff.payload и headers.(status, next_attempt_at) — резкий рост CPU и задержек выборки.Паттерн Outbox превращает тонкое место «БД ↔ брокер» из источника случайных потерь в надёжный и наблюдаемый конвейер. Вы сохраняете атомарность на стороне базы, а публикацию в брокер делаете устойчивой к сбоям за счёт повторов и дедупликации у потребителей. Это проще, чем 2PC, и лучше соответствует реальным требованиям бизнеса: никаких потерянных заказов, предсказуемые интеграции и меньше ночных звонков в поддержку.