
Любой продукт с заказами, оплатами, доставкой или партнёрскими интеграциями живёт на событиях: «заказ создан», «платёж прошёл», «посылка отгружена». Эти события требуют надёжной доставки наружу (в брокер, в вебхук партнёра) и такой же надёжной обработки на входе. Потерянное или продублированное событие влечёт деньги, SLA и репутацию.
Паттерны outbox и inbox дают две простые вещи:
Итог: меньше инцидентов, предсказуемые релизы, проще аудит.
Наивный подход: в коде сохраняем заказ в базе, затем публикуем событие в очередь или шлём вебхук. Между этими шагами — сеть, драйверы и всё, что может упасть. Возможны гонки:
Двухфазная фиксация (2PC) теоретически решает, но в реальности дорога, сложна и редко доступна.
Outbox — «исходящий журнал событий» в вашей основной базе. Суть:
Гарантии:
Inbox — «входящий журнал обработанных событий» в сервисе-потребителе:
Так достигается идемпотентность без сложных распределённых транзакций.
-- Расширение для генерирования UUID
create extension if not exists pgcrypto;
-- Outbox: исходящие события
create table if not exists outbox_events (
id uuid primary key default gen_random_uuid(),
aggregate_type text not null, -- что изменилось: 'order', 'invoice' и т.п.
aggregate_id text not null, -- идентификатор агрегата
event_type text not null, -- тип события: 'order.created'
payload jsonb not null, -- полезная нагрузка события
dedup_key text, -- необязательная дедупликация на уровне outbox
status text not null default 'pending',-- pending | processing | done | failed
retries int not null default 0,
available_at timestamptz not null default now(),
created_at timestamptz not null default now(),
processed_at timestamptz
);
-- Быстрый выбор по статусу/расписанию
create index if not exists idx_outbox_status_available on outbox_events(status, available_at);
-- Уникальность по опциональному ключу дедупликации (например, один заказ — одно событие создания)
create unique index if not exists uq_outbox_dedup_key on outbox_events(dedup_key) where dedup_key is not null;
-- Inbox: входящие события у потребителя
create table if not exists inbox_events (
id uuid primary key, -- стабильно передаваемый идентификатор события
received_at timestamptz not null default now(),
processed_at timestamptz, -- когда бизнес-логика завершена
handler_status text not null, -- pending | done | failed
payload jsonb not null,
signature text -- для аудита подписи
);
create index if not exists idx_inbox_status on inbox_events(handler_status);
Пример: создаём заказ и фиксируем событие «order.created» за один коммит.
begin;
with new_order as (
insert into orders(id, user_id, amount, status)
values (gen_random_uuid(), 'u_123', 9900, 'created')
returning id, user_id, amount
)
insert into outbox_events(
aggregate_type, aggregate_id, event_type, payload, dedup_key
)
select
'order', id::text, 'order.created',
jsonb_build_object(
'event_id', gen_random_uuid(),
'order_id', id,
'user_id', user_id,
'amount', amount,
'occurred_at', now()
),
'order.created:' || id::text
from new_order;
commit;
Тот же принцип легко реализуется из приложения. Важное: запись в outbox должна происходить в той же транзакции, что и бизнес-изменение.
Ниже — минимально жизнеспособный воркер на Go, который периодически «забирает» пачку событий из outbox, шлёт вебхук с HMAC‑подписью и обновляет статусы. Он безопасен к гонкам: сначала атомарно помечает события как processing, затем обрабатывает.
package main
import (
"context"
"crypto/hmac"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"math"
"net/http"
"os"
"time"
_ "github.com/lib/pq"
)
type Event struct {
ID string
Payload []byte
}
func main() {
dsn := mustEnv("DB_DSN") // пример: postgres://user:pass@localhost:5432/app?sslmode=disable
webhookURL := mustEnv("WEBHOOK_URL") // куда шлём
secret := mustEnv("WEBHOOK_SECRET") // общий секрет для подписи
db, err := sql.Open("postgres", dsn)
if err != nil { log.Fatal(err) }
defer db.Close()
ctx := context.Background()
batch := 50
interval := 2 * time.Second
for {
if err := publishBatch(ctx, db, webhookURL, secret, batch); err != nil {
log.Printf("publish error: %v", err)
}
time.Sleep(interval)
}
}
func publishBatch(ctx context.Context, db *sql.DB, url, secret string, limit int) error {
// 1) Забираем и помечаем события как processing атомарно
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil { return err }
rows, err := tx.QueryContext(ctx, `
with locked as (
select id from outbox_events
where status = 'pending' and available_at <= now()
order by created_at
for update skip locked
limit $1
), upd as (
update outbox_events o
set status = 'processing'
from locked
where o.id = locked.id
returning o.id, o.payload
)
select id, payload from upd;
`, limit)
if err != nil { tx.Rollback(); return err }
defer rows.Close()
var events []Event
for rows.Next() {
var id string
var payload []byte
if err := rows.Scan(&id, &payload); err != nil { tx.Rollback(); return err }
events = append(events, Event{ID: id, Payload: payload})
}
if err := rows.Err(); err != nil { tx.Rollback(); return err }
if err := tx.Commit(); err != nil { return err }
// 2) Отправляем каждый и фиксируем результат
for _, e := range events {
if err := sendOne(ctx, db, url, secret, e); err != nil {
log.Printf("event %s failed: %v", e.ID, err)
}
}
return nil
}
func sendOne(ctx context.Context, db *sql.DB, url, secret string, e Event) error {
// HMAC подпись тела + метка времени
ts := fmt.Sprintf("%d", time.Now().Unix())
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(ts))
mac.Write(e.Payload)
sig := hex.EncodeToString(mac.Sum(nil))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytesReader(e.Payload))
if err != nil { return err }
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Event-Id", e.ID)
req.Header.Set("X-Timestamp", ts)
req.Header.Set("X-Signature", "sha256="+sig)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return markRetry(ctx, db, e.ID, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
_, uerr := db.ExecContext(ctx, `update outbox_events set status='done', processed_at=now() where id=$1`, e.ID)
return uerr
}
return markRetry(ctx, db, e.ID, fmt.Errorf("status %d", resp.StatusCode))
}
func markRetry(ctx context.Context, db *sql.DB, id string, cause error) error {
// экспоненциальный бэкофф с верхней границей 10 минут
var retries int
if err := db.QueryRowContext(ctx, `update outbox_events
set retries = retries + 1
where id=$1
returning retries`, id).Scan(&retries); err != nil {
return err
}
sec := math.Min(600, math.Pow(2, float64(retries)))
_, err := db.ExecContext(ctx, `update outbox_events
set status='pending', available_at = now() + ($2 || ' seconds')::interval
where id=$1`, id, int(sec))
if err != nil { return err }
return cause
}
// bytesReader — маленький помощник без лишних аллокаций
func bytesReader(b []byte) *bytes.Reader { return bytes.NewReader(b) }
// need bytes for bytes.NewReader
import "bytes"
func mustEnv(key string) string {
v := os.Getenv(key)
if v == "" { log.Fatalf("missing env %s", key) }
return v
}
Пояснения:
Можно временно поставить WEBHOOK_URL на httpbin.org/post, чтобы быстро увидеть, что отправка идёт. В бою, конечно, используйте свой приёмник с валидацией подписи.
Ниже — обработчик вебхука на Go. Он:
package main
import (
"crypto/hmac"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"io"
"log"
"net/http"
"os"
"time"
_ "github.com/lib/pq"
)
type InboxPayload struct {
EventID string `json:"event_id"`
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount int `json:"amount"`
OccurredAt time.Time `json:"occurred_at"`
}
func main() {
dsn := mustEnv("DB_DSN")
secret := mustEnv("WEBHOOK_SECRET")
db, err := sql.Open("postgres", dsn)
if err != nil { log.Fatal(err) }
defer db.Close()
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil { http.Error(w, "read", 400); return }
defer r.Body.Close()
id := r.Header.Get("X-Event-Id")
ts := r.Header.Get("X-Timestamp")
sig := r.Header.Get("X-Signature")
if !verifyHMAC(secret, ts, body, sig) {
http.Error(w, "bad signature", 401); return
}
// идемпотентная вставка
var payload InboxPayload
if err := json.Unmarshal(body, &payload); err != nil { http.Error(w, "json", 400); return }
// Вставляем только если нет дубля
_, err = db.Exec(`insert into inbox_events(id, handler_status, payload, signature)
values ($1, 'pending', $2, $3)`, id, string(body), sig)
if err != nil {
// конфликт — уже обрабатывали
if pqErr, ok := err.(*sql.Error); ok { _ = pqErr } // заглушка компилятора, pq не даёт тип ошибки; используем текст
if isUniqueViolation(err) {
w.WriteHeader(200); w.Write([]byte("duplicate")); return
}
http.Error(w, "db", 500); return
}
// Здесь — бизнес-логика: например, резервирование на складе.
// Имитация быстрого действия
time.Sleep(10 * time.Millisecond)
_, err = db.Exec(`update inbox_events set handler_status='done', processed_at=now() where id=$1`, id)
if err != nil { http.Error(w, "update", 500); return }
w.WriteHeader(200)
w.Write([]byte("ok"))
})
log.Println("listening on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func verifyHMAC(secret, ts string, body []byte, header string) bool {
const prefix = "sha256="
if len(header) <= len(prefix) || header[:len(prefix)] != prefix { return false }
expected := hmac.New(sha256.New, []byte(secret))
expected.Write([]byte(ts))
expected.Write(body)
want := hex.EncodeToString(expected.Sum(nil))
got := header[len(prefix):]
return hmac.Equal([]byte(want), []byte(got))
}
func isUniqueViolation(err error) bool {
// Упрощённо: проверим по тексту. В проде используйте errors.As к *pq.Error и код "23505".
return err != nil && (contains(err.Error(), "unique") || contains(err.Error(), "duplicate key"))
}
func contains(s, sub string) bool { return len(s) >= len(sub) && (index(s, sub) >= 0) }
func index(s, sep string) int { return len([]rune(s[:])) - len([]rune(s[:])) + len(s) - len(s) + int64Index(s, sep) }
// простой индекс под строки без регистронезависимости
func int64Index(s, sep string) int { return len([]byte(s[:])) - len([]byte(s[:])) + bytesIndex([]byte(s), []byte(sep)) }
// bytesIndex — используем стандартную реализацию
import "bytes"
func bytesIndex(b, sub []byte) int { return bytes.Index(b, sub) }
func mustEnv(k string) string { v := os.Getenv(k); if v == "" { log.Fatalf("missing %s", k) }; return v }
Примечания к примеру потребителя:
At‑least‑once гарантирует доставку, но не порядок. Если порядок критичен для одного агрегата (например, «order.created» → «order.paid»), используйте:
Практичный минимум: включите в payload поле sequence и обрабатывайте с отложкой, если пришло «будущее» событие без «прошлого».
Эти паттерны не навязывают конкретный брокер или протокол: сегодня это вебхуки партнёрам, завтра Kafka — схема и воркеры останутся теми же. Главное — атомарность рождения события и идемпотентность его потребления.