
Сценарий знаком многим: заказ создан в БД, но письмо клиенту не ушло; платёж проведён, а вебхук партнёру «затерялся»; склад не получил событие о резервировании — начались ручные допроведения и человеческий фактор. Корень проблемы — разделение двух операций: запись в базу и отправка сообщения во внешний мир (очередь, вебхук, e‑mail). Если между ними что‑то пойдёт не так, система теряет согласованность.
Транзакционный аутбокс (outbox) решает это: мы фиксируем событие в таблице в той же базе и в той же транзакции, где меняем бизнес‑данные. А отдельный надёжный воркер дочитывает эту таблицу и доставляет события дальше с ретраями. В итоге «сначала сохранили в БД — потом попробовали отправить» превращается в «сохранили и поставили в локальную очередь атомарно; доставим хоть из офлайна».
Для бизнеса это значит меньше инцидентов, меньше ручной работы, чище отчёты и прогнозируемые интеграции с партнёрами.
Коротко:
Что получаем по гарантиям:
Для большинства B2B‑сервисов со стабильным трафиком достаточно поллинга, а к CDC можно прийти позже.
-- PostgreSQL
CREATE TYPE outbox_status AS ENUM ('pending', 'in_progress', 'delivered', 'failed');
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- тип сущности (order, invoice, user)
aggregate_id TEXT NOT NULL, -- идентификатор сущности (строка на случай UUID)
event_type TEXT NOT NULL, -- тип события (order.created)
idempotency_key UUID NOT NULL, -- уникальный ключ события для потребителей
payload JSONB NOT NULL, -- полезная нагрузка (минимум PII)
status outbox_status NOT NULL DEFAULT 'pending',
attempts INT NOT NULL DEFAULT 0,
available_at TIMESTAMPTZ NOT NULL DEFAULT now(), -- отложенная доставка
next_attempt_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_outbox_idempotency ON outbox (idempotency_key);
CREATE INDEX ix_outbox_pick ON outbox (status, next_attempt_at, available_at, id);
CREATE INDEX ix_outbox_created ON outbox (created_at);
-- Триггер обновляет updated_at
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END; $$ LANGUAGE plpgsql;
CREATE TRIGGER trg_outbox_updated
BEFORE UPDATE ON outbox
FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
Рекомендации:
Ниже — безопасный паттерн: бизнес‑операция и запись в outbox фиксируются одним коммитом.
-- Создание заказа и запись события в outbox в одной транзакции
BEGIN;
INSERT INTO orders (id, customer_id, amount_cents, currency, status, created_at)
VALUES ($1, $2, $3, $4, 'created', now());
INSERT INTO outbox (
aggregate_type, aggregate_id, event_type, idempotency_key, payload, status, available_at
) VALUES (
'order', $1, 'order.created', gen_random_uuid(),
jsonb_build_object(
'order_id', $1,
'customer_id', $2,
'amount_cents', $3,
'currency', $4
),
'pending', now()
);
COMMIT;
Важно: никогда не отправляйте событие во внешний мир до фиксации транзакции. Только после успешного коммита воркер увидит запись в outbox и возьмёт её в работу.
Воркеры забирают пачку записей, помечают «в работе», отправляют HTTP‑вебхуки на ваш интеграционный шлюз или партнёрам, и обновляют статус. Используем Postgres‑специфику: SELECT … FOR UPDATE SKIP LOCKED. Это позволяет безопасно запускать несколько воркеров параллельно.
Ниже полноценный пример на Go: без внешних библиотек, только стандартная библиотека. Ворк отправляет вебхуки по HTTPS, подписывает их HMAC‑подписью и реализует экспоненциальный backoff.
package main
import (
"context"
"crypto/hmac"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
_ "github.com/lib/pq"
)
// Конфигурация воркера
type Config struct {
DBURL string
WebhookURL string
HMACSecret string
BatchSize int
MaxAttempts int
PollInterval time.Duration
RequestTimeout time.Duration
ParallelSends int
}
type OutboxEvent struct {
ID int64 `json:"id"`
AggregateType string `json:"aggregate_type"`
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
IdempotencyKey string `json:"idempotency_key"`
Payload json.RawMessage `json:"payload"`
}
func main() {
cfg := Config{
DBURL: env("DB_URL", "postgres://user:pass@localhost:5432/app?sslmode=disable"),
WebhookURL: mustEnv("WEBHOOK_URL"),
HMACSecret: mustEnv("HMAC_SECRET"),
BatchSize: envInt("BATCH_SIZE", 100),
MaxAttempts: envInt("MAX_ATTEMPTS", 10),
PollInterval: envDuration("POLL_INTERVAL", 2*time.Second),
RequestTimeout: envDuration("REQUEST_TIMEOUT", 5*time.Second),
ParallelSends: envInt("PARALLEL_SENDS", 10),
}
db, err := sql.Open("postgres", cfg.DBURL)
if err != nil { log.Fatalf("db open: %v", err) }
defer db.Close()
httpClient := &http.Client{ Timeout: cfg.RequestTimeout }
ticker := time.NewTicker(cfg.PollInterval)
defer ticker.Stop()
for {
if n := dispatchOnce(db, httpClient, cfg); n == 0 {
<-ticker.C
}
}
}
func dispatchOnce(db *sql.DB, httpClient *http.Client, cfg Config) int {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil { log.Printf("begin tx: %v", err); return 0 }
// Забираем пачку событий и помечаем их in_progress атомарно
rows, err := tx.QueryContext(ctx, `
WITH cte AS (
SELECT id
FROM outbox
WHERE status = 'pending'
AND coalesce(next_attempt_at, now()) <= now()
AND available_at <= now()
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT $1
)
UPDATE outbox o
SET status = 'in_progress'
FROM cte
WHERE o.id = cte.id
RETURNING o.id, o.aggregate_type, o.aggregate_id, o.event_type, o.idempotency_key, o.payload
`, cfg.BatchSize)
if err != nil { _ = tx.Rollback(); log.Printf("select: %v", err); return 0 }
var batch []OutboxEvent
for rows.Next() {
var e OutboxEvent
if err := rows.Scan(&e.ID, &e.AggregateType, &e.AggregateID, &e.EventType, &e.IdempotencyKey, &e.Payload); err != nil {
_ = tx.Rollback(); log.Printf("scan: %v", err); return 0
}
batch = append(batch, e)
}
rows.Close()
if err := tx.Commit(); err != nil {
log.Printf("commit: %v", err)
return 0
}
if len(batch) == 0 { return 0 }
// Параллельная отправка
sem := make(chan struct{}, cfg.ParallelSends)
done := make(chan struct{})
for _, ev := range batch {
sem <- struct{}{}
go func(ev OutboxEvent) {
defer func() { <-sem; done <- struct{}{} }()
deliverEvent(db, httpClient, cfg, ev)
}(ev)
}
for i := 0; i < len(batch); i++ { <-done }
return len(batch)
}
func deliverEvent(db *sql.DB, httpClient *http.Client, cfg Config, ev OutboxEvent) {
// Формируем HTTP‑запрос с подписью: X-Signature: sha256=...
body := map[string]any{
"event_type": ev.EventType,
"idempotency_key": ev.IdempotencyKey,
"aggregate_type": ev.AggregateType,
"aggregate_id": ev.AggregateID,
"payload": json.RawMessage(ev.Payload),
"sent_at": time.Now().UTC().Format(time.RFC3339Nano),
}
b, _ := json.Marshal(body)
sig := signHMAC(cfg.HMACSecret, b)
req, _ := http.NewRequest(http.MethodPost, cfg.WebhookURL, strings.NewReader(string(b)))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Signature", "sha256="+sig)
req.Header.Set("X-Idempotency-Key", ev.IdempotencyKey)
resp, err := httpClient.Do(req)
if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
_ = resp.Body.Close()
markDelivered(db, ev.ID)
return
}
if resp != nil { _ = resp.Body.Close() }
// Ошибка: увеличиваем счётчик и планируем retry
scheduleRetry(db, ev.ID, cfg.MaxAttempts)
}
func markDelivered(db *sql.DB, id int64) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if _, err := db.ExecContext(ctx, `UPDATE outbox SET status='delivered' WHERE id=$1`, id); err != nil {
log.Printf("mark delivered %d: %v", id, err)
}
}
func scheduleRetry(db *sql.DB, id int64, maxAttempts int) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// Экспоненциальный backoff: min(2^attempts * 1s, 5m)
_, err := db.ExecContext(ctx, `
UPDATE outbox
SET attempts = attempts + 1,
status = CASE WHEN attempts + 1 >= $2 THEN 'failed' ELSE 'pending' END,
next_attempt_at = CASE
WHEN attempts + 1 >= $2 THEN null
ELSE now() + LEAST(((2 ^ (attempts + 1))::int)::text::interval, '5 minutes'::interval)
END
WHERE id = $1
`, id, maxAttempts)
if err != nil {
log.Printf("schedule retry %d: %v", id, err)
}
}
func signHMAC(secret string, body []byte) string {
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
return hex.EncodeToString(mac.Sum(nil))
}
// Утилиты окружения
func env(k, def string) string {
if v := os.Getenv(k); v != "" { return v }
return def
}
func mustEnv(k string) string {
v := os.Getenv(k)
if v == "" { log.Fatalf("env %s required", k) }
return v
}
func envInt(k string, def int) int {
if v := os.Getenv(k); v != "" { var i int; fmt.Sscanf(v, "%d", &i); return i }
return def
}
func envDuration(k string, def time.Duration) time.Duration {
if v := os.Getenv(k); v != "" { d, err := time.ParseDuration(v); if err == nil { return d } }
return def
}
Примечания к воркеру:
Пример блокировки по сущности через advisory lock, если нужно сохранять порядок для одного заказа:
-- Внутри транзакции доставки конкретной записи можно взять advisory lock:
SELECT pg_advisory_xact_lock(hashtext(aggregate_type || ':' || aggregate_id));
Отслеживайте:
Алерты:
Дэд‑леттер (dead‑letter): при достижении MaxAttempts переводите событие в failed и складывайте его в отдельную таблицу/очередь для ручного или автоматического повторного разбора.
Если же у вас есть вебхуки, брокеры, e‑mail, платёжные статусы, складские или бухгалтерские интеграции — аутбокс резко снижает риск потерь и стоимость инцидентов. Это тот случай, когда простая таблица и аккуратный воркер экономят часы инженеров и нервы поддержки.