
• Оглавление
Сценарий знаком многим: сервис записал заказ в базу данных, затем попытался отправить событие в брокер/вебхук — и что‑то пошло не так. В итоге партнёр не получил уведомление, склад не отгрузил товар, деньги «зависли». Обратная ситуация не лучше: событие улетело, а запись в БД не сохранилась — получаем расхождения и долгие разборы.
Причина — две независимые операции: запись в БД и отправка в стороннюю систему. Между ними всегда есть риск сбоев, повторов и расхождений. Бизнес платит за это инцидентами, отменами, ручными правками и недополученной выручкой.
Паттерн Outbox решает проблему: мы пишем «факт изменения» (событие) в отдельную таблицу той же базой транзакцией вместе с бизнес‑сущностью. Дальше независимый процесс безопасно публикует эти записи во внешние системы. Если используем CDC (фиксация изменений), то публикация вообще становится «без кода» — события вычитываются из журнала изменений базы.
Ключевая идея — «сшить» бизнес‑данные и событие одной транзакцией, чтобы либо записалось всё, либо ничего. Это устраняет главный источник рассинхронизации.
В PostgreSQL есть удобный приём: выбирать события с блокировкой и пропуском занятых строк:
CDC (Change Data Capture) — это вычитка изменений прямо из журнала транзакций базы. Debezium умеет подключаться к PostgreSQL и стримить изменения из выбранной таблицы (например, outbox_events) в Kafka. Преимущества:
Ниже — полноценные модели и функция создания заказа. Обратите внимание: событие записывается транзакцией вместе с заказом.
# models.py
import uuid
from django.db import models
class Order(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
customer_email = models.EmailField()
amount = models.DecimalField(max_digits=12, decimal_places=2)
status = models.CharField(max_length=32, default="created")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "orders"
class OutboxEvent(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
aggregate_type = models.CharField(max_length=64) # например, "order"
aggregate_id = models.UUIDField() # id заказа
event_type = models.CharField(max_length=128) # например, "order.created"
payload = models.JSONField() # полезная нагрузка события
created_at = models.DateTimeField(auto_now_add=True)
published_at = models.DateTimeField(null=True, blank=True)
attempts = models.IntegerField(default=0)
last_error = models.TextField(null=True, blank=True)
class Meta:
db_table = "outbox_events"
indexes = [
models.Index(fields=["created_at"], name="idx_outbox_created_at"),
]
# services.py
from django.db import transaction
from .models import Order, OutboxEvent
def create_order_and_event(customer_email: str, amount) -> Order:
"""Создать заказ и событие в одной транзакции."""
with transaction.atomic():
order = Order.objects.create(customer_email=customer_email, amount=amount)
OutboxEvent.objects.create(
aggregate_type="order",
aggregate_id=order.id,
event_type="order.created",
payload={
"order_id": str(order.id),
"amount": str(order.amount),
"customer_email": order.customer_email,
"status": order.status,
"created_at": order.created_at.isoformat(),
"version": 1, # версия схемы события
},
)
return order
Для ускорения выборки непубликованных событий добавьте частичный индекс (сырое SQL можно выполнить отдельно в миграции):
-- 0002_add_partial_index_outbox.sql
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_outbox_unpublished
ON outbox_events (created_at)
WHERE published_at IS NULL;
Этот скрипт читает события партиями, публикует на внешний вебхук и помечает опубликованные. Используем безопасный шаблон: SELECT ... FOR UPDATE SKIP LOCKED. Если публикация не удалась — увеличиваем счётчик попыток и логируем причину. Все параметры берутся из переменных окружения.
# publisher.py
import os
import time
import json
import signal
import logging
from typing import List
import psycopg
import requests
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost:5432/app")
WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "http://localhost:8080/events")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
SLEEP_SECONDS = float(os.environ.get("SLEEP_SECONDS", "0.5"))
MAX_ATTEMPTS = int(os.environ.get("MAX_ATTEMPTS", "10"))
REQUEST_TIMEOUT = float(os.environ.get("REQUEST_TIMEOUT", "5"))
stop = False
def handle_signal(signum, frame):
global stop
stop = True
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
SESSION_HEADERS = {"Content-Type": "application/json"}
def publish_batch(conn: psycopg.Connection) -> int:
published = 0
with conn.transaction():
rows = conn.execute(
f"""
SELECT id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL AND attempts < %s
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT %s
""",
(MAX_ATTEMPTS, BATCH_SIZE),
).fetchall()
if not rows:
return 0
for row in rows:
event_id = row[0]
event_type = row[1]
payload = row[2]
body = {
"id": str(event_id),
"type": event_type,
"payload": payload,
}
try:
resp = requests.post(WEBHOOK_URL, data=json.dumps(body), headers=SESSION_HEADERS, timeout=REQUEST_TIMEOUT)
if 200 <= resp.status_code < 300:
conn.execute(
"UPDATE outbox_events SET published_at = NOW(), attempts = attempts + 1, last_error = NULL WHERE id = %s",
(event_id,),
)
published += 1
else:
conn.execute(
"UPDATE outbox_events SET attempts = attempts + 1, last_error = %s WHERE id = %s",
(f"HTTP {resp.status_code}: {resp.text[:200]}", event_id),
)
except Exception as e:
conn.execute(
"UPDATE outbox_events SET attempts = attempts + 1, last_error = %s WHERE id = %s",
(str(e)[:500], event_id),
)
return published
def main():
logging.info("publisher starting")
with psycopg.connect(DATABASE_URL) as conn:
while not stop:
try:
n = publish_batch(conn)
if n == 0:
time.sleep(SLEEP_SECONDS)
else:
logging.info("published %d events", n)
except Exception:
logging.exception("publisher loop error")
time.sleep(1.0)
logging.info("publisher stopped")
if __name__ == "__main__":
main()
Такой паблишер можно масштабировать горизонтально — строки распределяются за счёт SKIP LOCKED. Чтобы не застревать на «токсичных» событиях, ограничиваем число попыток и поднимаем алерт.
Даже при аккуратной публикации возможны повторы (например, из‑за таймаута у издателя). Потребитель должен быть идемпотентным: принимать повтор без побочных эффектов. Самый простой способ — таблица обработанных событий с первичным ключом по id события.
# consumer.py — пример FastAPI
import os
import uuid
from fastapi import FastAPI, Request, HTTPException
import psycopg
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost:5432/app")
app = FastAPI()
@app.on_event("startup")
def startup():
# создадим таблицу, если её нет
with psycopg.connect(DATABASE_URL, autocommit=True) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
"""
)
@app.post("/events")
async def handle_event(request: Request):
data = await request.json()
try:
event_id = uuid.UUID(data["id"]) # валидируем id
event_type = data["type"]
payload = data["payload"]
except Exception:
raise HTTPException(status_code=400, detail="bad event format")
with psycopg.connect(DATABASE_URL) as conn:
with conn.transaction():
# попытаемся зарезервировать событие
cur = conn.execute(
"INSERT INTO processed_events (event_id) VALUES (%s) ON CONFLICT DO NOTHING",
(event_id,)
)
if cur.rowcount == 0:
# событие уже обрабатывали — тихо подтверждаем
return {"status": "duplicate"}
# ваша бизнес‑логика тут — должна быть устойчивой к повторам
if event_type == "order.created":
# пример: создаём задачу на сборку посылки, пишем лог и т.д.
pass
return {"status": "ok"}
Если у вас Kafka/Redpanda и команда SRE, CDC снимает заботу о пуллере. Достаточно настроить коннектор, и записи из outbox_events начнут течь в нужные топики. Пример минимальной конфигурации Debezium (PostgreSQL):
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "app",
"slot.name": "outbox_slot",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete": "false",
"plugin.name": "pgoutput",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.route.topic.replacement": "events.${routedByValue}",
"transforms.outbox.table.fields.additional.placement": "headers:aggregate_type:aggregate_type,aggregate_id:aggregate_id"
}
}
Примечание: EventRouter берёт из строки outbox_events поля event_type/payload и автоматически формирует сообщения для Kafka топиков вида events.order.created и т.п.
DELETE FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '30 days';
Пример простых метрик Prometheus в паблишере можно добавить через библиотеку prometheus_client и HTTP‑эндпоинт — это несколько строк, но уже даст видимость очереди и ошибок.
Outbox и CDC закрывают системный класс ошибок интеграций: события больше не теряются между БД и брокером, а повторы не бьют по данным. Это снижает количество инцидентов, ускоряет расследования (всё видно в одной таблице), упрощает масштабирование и уменьшает издержки на ручные правки. Внедрение обычно занимает 1–2 недели, окупается с первого «неслучившегося» инцидента и готовит платформу к надёжным интеграциям на годы вперёд.