Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Outbox + CDC в PostgreSQL: как гарантировать доставку событий и не чинить интеграции каждую неделю

Разработка и технологии18 декабря 2025 г.
Интеграции ломаются не из‑за «плохих» брокеров, а из‑за разрыва между записью в БД и публикацией события. Разберём паттерн Outbox и фиксацию изменений (CDC): как сшить данные и события одной транзакцией, перестать терять сообщения и снизить операционные риски.
Outbox + CDC в PostgreSQL: как гарантировать доставку событий и не чинить интеграции каждую неделю

• Оглавление

  • Зачем бизнесу Outbox и CDC: где теряются деньги
  • Как работает паттерн Outbox: простыми словами
  • Вариант 1. Пуллер из таблицы с SKIP LOCKED (PostgreSQL)
  • Вариант 2. CDC через Debezium: поток событий без кода
  • Пример: Django + PostgreSQL — создаём заказ и событие в одной транзакции
  • Паблишер: безопасная публикация из Outbox по HTTP (с повторными попытками)
  • Идемпотентный потребитель: как не обрабатывать одно событие дважды
  • Гарантии порядка, рост таблицы и чистка, эволюция схемы событий
  • Мониторинг и оповещения: что мерить
  • Частые ошибки и как их избежать
  • Чеклист внедрения

Зачем бизнесу Outbox и CDC: где теряются деньги

Сценарий знаком многим: сервис записал заказ в базу данных, затем попытался отправить событие в брокер/вебхук — и что‑то пошло не так. В итоге партнёр не получил уведомление, склад не отгрузил товар, деньги «зависли». Обратная ситуация не лучше: событие улетело, а запись в БД не сохранилась — получаем расхождения и долгие разборы.

Причина — две независимые операции: запись в БД и отправка в стороннюю систему. Между ними всегда есть риск сбоев, повторов и расхождений. Бизнес платит за это инцидентами, отменами, ручными правками и недополученной выручкой.

Паттерн Outbox решает проблему: мы пишем «факт изменения» (событие) в отдельную таблицу той же базой транзакцией вместе с бизнес‑сущностью. Дальше независимый процесс безопасно публикует эти записи во внешние системы. Если используем CDC (фиксация изменений), то публикация вообще становится «без кода» — события вычитываются из журнала изменений базы.

Как работает паттерн Outbox: простыми словами

  • При изменении бизнес‑данных (например, создан заказ) мы в той же транзакции пишем в таблицу outbox_events запись с типом события и полезной нагрузкой.
  • Отдельный процесс (пуллер) читает из outbox_events новые записи, публикует их в брокер/вебхук и помечает как доставленные.
  • Потребитель на своей стороне делает обработку идемпотентной: одно и то же событие можно принять и обработать несколько раз, результат не меняется.

Ключевая идея — «сшить» бизнес‑данные и событие одной транзакцией, чтобы либо записалось всё, либо ничего. Это устраняет главный источник рассинхронизации.

Вариант 1. Пуллер из таблицы с SKIP LOCKED (PostgreSQL)

В PostgreSQL есть удобный приём: выбирать события с блокировкой и пропуском занятых строк:

  • SELECT ... FOR UPDATE SKIP LOCKED Это позволяет нескольким воркерам безопасно разбирать очередь без гонок: одну и ту же строку не возьмут два процесса.

Вариант 2. CDC через Debezium: поток событий без кода

CDC (Change Data Capture) — это вычитка изменений прямо из журнала транзакций базы. Debezium умеет подключаться к PostgreSQL и стримить изменения из выбранной таблицы (например, outbox_events) в Kafka. Преимущества:

  • не пишем пуллер и не блокируем таблицу;
  • минимальная нагрузка на приложение;
  • события «летят» почти в реальном времени. Минусы: нужна инфраструктура Kafka/Redpanda + Debezium.

Пример: Django + PostgreSQL — создаём заказ и событие в одной транзакции

Ниже — полноценные модели и функция создания заказа. Обратите внимание: событие записывается транзакцией вместе с заказом.

# models.py
import uuid
from django.db import models

class Order(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    customer_email = models.EmailField()
    amount = models.DecimalField(max_digits=12, decimal_places=2)
    status = models.CharField(max_length=32, default="created")
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = "orders"

class OutboxEvent(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    aggregate_type = models.CharField(max_length=64)  # например, "order"
    aggregate_id = models.UUIDField()                 # id заказа
    event_type = models.CharField(max_length=128)     # например, "order.created"
    payload = models.JSONField()                      # полезная нагрузка события
    created_at = models.DateTimeField(auto_now_add=True)
    published_at = models.DateTimeField(null=True, blank=True)
    attempts = models.IntegerField(default=0)
    last_error = models.TextField(null=True, blank=True)

    class Meta:
        db_table = "outbox_events"
        indexes = [
            models.Index(fields=["created_at"], name="idx_outbox_created_at"),
        ]
# services.py
from django.db import transaction
from .models import Order, OutboxEvent

def create_order_and_event(customer_email: str, amount) -> Order:
    """Создать заказ и событие в одной транзакции."""
    with transaction.atomic():
        order = Order.objects.create(customer_email=customer_email, amount=amount)
        OutboxEvent.objects.create(
            aggregate_type="order",
            aggregate_id=order.id,
            event_type="order.created",
            payload={
                "order_id": str(order.id),
                "amount": str(order.amount),
                "customer_email": order.customer_email,
                "status": order.status,
                "created_at": order.created_at.isoformat(),
                "version": 1,  # версия схемы события
            },
        )
        return order

Для ускорения выборки непубликованных событий добавьте частичный индекс (сырое SQL можно выполнить отдельно в миграции):

-- 0002_add_partial_index_outbox.sql
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_outbox_unpublished
ON outbox_events (created_at)
WHERE published_at IS NULL;

Паблишер: безопасная публикация из Outbox по HTTP (с повторными попытками)

Этот скрипт читает события партиями, публикует на внешний вебхук и помечает опубликованные. Используем безопасный шаблон: SELECT ... FOR UPDATE SKIP LOCKED. Если публикация не удалась — увеличиваем счётчик попыток и логируем причину. Все параметры берутся из переменных окружения.

# publisher.py
import os
import time
import json
import signal
import logging
from typing import List
import psycopg
import requests

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost:5432/app")
WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "http://localhost:8080/events")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
SLEEP_SECONDS = float(os.environ.get("SLEEP_SECONDS", "0.5"))
MAX_ATTEMPTS = int(os.environ.get("MAX_ATTEMPTS", "10"))
REQUEST_TIMEOUT = float(os.environ.get("REQUEST_TIMEOUT", "5"))

stop = False

def handle_signal(signum, frame):
    global stop
    stop = True

signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)

SESSION_HEADERS = {"Content-Type": "application/json"}

def publish_batch(conn: psycopg.Connection) -> int:
    published = 0
    with conn.transaction():
        rows = conn.execute(
            f"""
            SELECT id, event_type, payload
            FROM outbox_events
            WHERE published_at IS NULL AND attempts < %s
            ORDER BY created_at
            FOR UPDATE SKIP LOCKED
            LIMIT %s
            """,
            (MAX_ATTEMPTS, BATCH_SIZE),
        ).fetchall()

        if not rows:
            return 0

        for row in rows:
            event_id = row[0]
            event_type = row[1]
            payload = row[2]

            body = {
                "id": str(event_id),
                "type": event_type,
                "payload": payload,
            }
            try:
                resp = requests.post(WEBHOOK_URL, data=json.dumps(body), headers=SESSION_HEADERS, timeout=REQUEST_TIMEOUT)
                if 200 <= resp.status_code < 300:
                    conn.execute(
                        "UPDATE outbox_events SET published_at = NOW(), attempts = attempts + 1, last_error = NULL WHERE id = %s",
                        (event_id,),
                    )
                    published += 1
                else:
                    conn.execute(
                        "UPDATE outbox_events SET attempts = attempts + 1, last_error = %s WHERE id = %s",
                        (f"HTTP {resp.status_code}: {resp.text[:200]}", event_id),
                    )
            except Exception as e:
                conn.execute(
                    "UPDATE outbox_events SET attempts = attempts + 1, last_error = %s WHERE id = %s",
                    (str(e)[:500], event_id),
                )
    return published


def main():
    logging.info("publisher starting")
    with psycopg.connect(DATABASE_URL) as conn:
        while not stop:
            try:
                n = publish_batch(conn)
                if n == 0:
                    time.sleep(SLEEP_SECONDS)
                else:
                    logging.info("published %d events", n)
            except Exception:
                logging.exception("publisher loop error")
                time.sleep(1.0)
    logging.info("publisher stopped")

if __name__ == "__main__":
    main()

Такой паблишер можно масштабировать горизонтально — строки распределяются за счёт SKIP LOCKED. Чтобы не застревать на «токсичных» событиях, ограничиваем число попыток и поднимаем алерт.

Идемпотентный потребитель: как не обрабатывать одно событие дважды

Даже при аккуратной публикации возможны повторы (например, из‑за таймаута у издателя). Потребитель должен быть идемпотентным: принимать повтор без побочных эффектов. Самый простой способ — таблица обработанных событий с первичным ключом по id события.

# consumer.py — пример FastAPI
import os
import uuid
from fastapi import FastAPI, Request, HTTPException
import psycopg

DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost:5432/app")
app = FastAPI()

@app.on_event("startup")
def startup():
    # создадим таблицу, если её нет
    with psycopg.connect(DATABASE_URL, autocommit=True) as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS processed_events (
                event_id UUID PRIMARY KEY,
                processed_at TIMESTAMPTZ DEFAULT NOW()
            );
            """
        )

@app.post("/events")
async def handle_event(request: Request):
    data = await request.json()
    try:
        event_id = uuid.UUID(data["id"])  # валидируем id
        event_type = data["type"]
        payload = data["payload"]
    except Exception:
        raise HTTPException(status_code=400, detail="bad event format")

    with psycopg.connect(DATABASE_URL) as conn:
        with conn.transaction():
            # попытаемся зарезервировать событие
            cur = conn.execute(
                "INSERT INTO processed_events (event_id) VALUES (%s) ON CONFLICT DO NOTHING",
                (event_id,)
            )
            if cur.rowcount == 0:
                # событие уже обрабатывали — тихо подтверждаем
                return {"status": "duplicate"}

            # ваша бизнес‑логика тут — должна быть устойчивой к повторам
            if event_type == "order.created":
                # пример: создаём задачу на сборку посылки, пишем лог и т.д.
                pass

    return {"status": "ok"}

Вариант 2: CDC через Debezium — минимум кода, максимум надёжности

Если у вас Kafka/Redpanda и команда SRE, CDC снимает заботу о пуллере. Достаточно настроить коннектор, и записи из outbox_events начнут течь в нужные топики. Пример минимальной конфигурации Debezium (PostgreSQL):

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "app",
    "slot.name": "outbox_slot",
    "publication.autocreate.mode": "filtered",
    "tombstones.on.delete": "false",
    "plugin.name": "pgoutput",
    "table.include.list": "public.outbox_events",

    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "event_type",
    "transforms.outbox.route.topic.replacement": "events.${routedByValue}",
    "transforms.outbox.table.fields.additional.placement": "headers:aggregate_type:aggregate_type,aggregate_id:aggregate_id"
  }
}

Примечание: EventRouter берёт из строки outbox_events поля event_type/payload и автоматически формирует сообщения для Kafka топиков вида events.order.created и т.п.

Гарантии порядка, рост таблицы и чистка, эволюция схемы событий

Порядок

  • Внутри одной сущности (aggregate_id) порядок важен. Обеспечьте, чтобы паблишер отправлял события по одному для конкретного aggregate_id, либо агрегируйте на стороне потребителя.
  • В Kafka можно использовать ключ сообщений = aggregate_id, чтобы все события по сущности шли в один раздел и не меняли порядок.

Рост Outbox и политика чистки

  • Таблица будет расти. Планируйте ежедневную чистку опубликованных событий старше, например, 30 дней:
DELETE FROM outbox_events
WHERE published_at IS NOT NULL
  AND published_at < NOW() - INTERVAL '30 days';
  • Добавьте автовакаум и еженедельный REINDEX для крупных таблиц, если наблюдаются замедления.

Эволюция схемы событий

  • Добавьте поле version в payload, чтобы безопасно менять формат.
  • Держите потребителей обратно совместимыми: новые поля — можно игнорировать, старые — не удаляйте без миграции.

Мониторинг и оповещения: что мерить

  • Лаг публикации: текущий возраст самого старого непубликованного события (цель: < 1–5 минут, в зависимости от SLA).
  • Доля ошибок публикации за 5/15 минут.
  • Размер таблицы outbox_events, средний attempts.
  • Число «токсичных» событий (attempts >= MAX_ATTEMPTS).
  • На стороне потребителя — доля дубликатов и время обработки.

Пример простых метрик Prometheus в паблишере можно добавить через библиотеку prometheus_client и HTTP‑эндпоинт — это несколько строк, но уже даст видимость очереди и ошибок.

Частые ошибки и как их избежать

  • Отправка события до коммита транзакции: потребитель видит факт, которого ещё нет в БД. Лечится только Outbox (или полноценной 2PC — дорого и сложно).
  • «Огромные» payload в событии: выносите тяжёлые поля (например, файлы) по ссылке с подписью доступа.
  • Отсутствие идемпотентности у потребителя: дубли приводят к двойным действиям. Добавьте таблицу processed_events или идемпотентные ключи.
  • Один глобальный порядок: не нужен и тормозит систему. Думайте в терминах порядка «внутри агрегата».
  • Блокировки без SKIP LOCKED: два воркера дерутся за одну строку. Используйте SELECT ... FOR UPDATE SKIP LOCKED.
  • Бесконтрольный рост таблицы: настройте регулярную чистку и индексы.

Чеклист внедрения

  • Добавили таблицу outbox_events и частичный индекс по непубликованным.
  • Пишем событие в outbox в той же транзакции, где меняем бизнес‑данные.
  • Запустили паблишер (или Debezium) и настроили повторные попытки.
  • Потребитель делает обработку идемпотентной (таблица processed_events / ключи).
  • Расписали политику чистки outbox и алерты на лаг/ошибки.
  • Прописали версионирование payload и правила совместимости.
  • Провели нагрузочные и «хаос» тесты: сеть падает, брокер недоступен, таймауты, повторы.

Итоги для бизнеса

Outbox и CDC закрывают системный класс ошибок интеграций: события больше не теряются между БД и брокером, а повторы не бьют по данным. Это снижает количество инцидентов, ускоряет расследования (всё видно в одной таблице), упрощает масштабирование и уменьшает издержки на ручные правки. Внедрение обычно занимает 1–2 недели, окупается с первого «неслучившегося» инцидента и готовит платформу к надёжным интеграциям на годы вперёд.


postgresqloutboxcdc