Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Сага‑паттерн для распределённых транзакций: быстрее масштабируем бизнес‑процессы без 2PC и простоев

Разработка и технологии17 апреля 2026 г.
Большие операции часто затрагивают несколько сервисов: склад, оплату, доставку. Глобальные транзакции (2PC) тормозят релизы, блокируют базы и ломаются в跨‑региональных сценариях. Сага‑подход собирает такую операцию из независимых шагов с чёткими компенсациями — без общей блокировки, с наблюдаемостью и понятными гарантиями для бизнеса. В статье — разбор плюсов/минусов, дизайн шагов, хранение состояния и готовый мини‑оркестратор на Python + SQLite.
Сага‑паттерн для распределённых транзакций: быстрее масштабируем бизнес‑процессы без 2PC и простоев

• Оглавление

  • Задача бизнеса: одна операция — много сервисов
  • Почему не 2PC и «распределённая транзакция»
  • Сага: идея и варианты — оркестрация и хореография
  • Дизайн саги: шаги, компенсации, дедлайны, повторы
  • Состояние, события и надёжная доставка (Outbox/Inbox)
  • Наблюдаемость и безопасность
  • Пример: мини‑оркестратор саги на Python + SQLite
  • Хореография: когда без центрального оркестратора
  • Чек‑лист на прод
  • Экономика решения и риски
  • Итоги

Задача бизнеса: одна операция — много сервисов

Пример из e‑commerce: покупатель оформляет заказ. Чтобы «заказ создан» стало правдой, нужно:

  • Зарезервировать товар на складе.
  • Списать оплату или поставить её в холд.
  • Создать доставку и получить трек‑номер.

Три независимые системы, три отдельных канала отказов. Бизнесу нужна целостность: либо всё состоялось, либо корректный откат. Но тянуть глобальную транзакцию через несколько сервисов и баз — дорого, медленно и хрупко.

Почему не 2PC и «распределённая транзакция»

Двухфазная фиксация (2PC) и распределённые транзакции дают «всё или ничего», но:

  • Блокируют ресурсы на время координации — под нагрузкой это тормоз и рост таймаутов.
  • Жёстко связывают стеки и базы (часто — один вендор и одна технология).
  • Плохо дружат с несколькими дата‑центрами и сетями с потерями.
  • Усложняют отказоустойчивость: одно слабое звено — и весь процесс встал.

Сага решает задачу по‑другому: не пытается блокировать мир, а раскладывает длинную операцию на последовательность локальных действий со своими «компенсирующими» шажками на случай отката.

Сага: идея и варианты — оркестрация и хореография

Сага — это сценарий из шагов. Каждый шаг — обычная локальная транзакция в своём сервисе. Если какой‑то шаг не удался, уже выполненные отменяются компенсационными действиями.

Есть два стиля:

  • Оркестрация. Есть центральный «дирижёр», который запускает шаги, ждёт ответы, принимает решение об откате. Плюсы — прозрачность, предсказуемость, простые SLA. Минусы — ещё один сервис, который надо поддерживать.
  • Хореография. Сервисы обмениваются событиями напрямую: «склад зарезервировал» → «оплата списала» → «доставка оформила». Плюсы — меньше точек централизации, слабая связанность. Минусы — сложнее наблюдать и гарантировать порядок/дедлайны.

Для критичных денег и контрактных SLA чаще удобна оркестрация. Для «лёгких» сценариев интеграции между независимыми доменами — хореография.

Дизайн саги: шаги, компенсации, дедлайны, повторы

Как спроектировать корректную сагу:

  1. Разложить на минимальные независимые шаги.
  • Склад: Reserve → Release (компенсация).
  • Оплата: Capture или Authorize → Refund/Void.
  • Доставка: Create → Cancel.
  1. Согласовать контракты шагов.
  • Чёткие входы/выходы: что вернёт шаг (например, payment_id), что требуется для компенсации.
  • Операции должны быть безопасны при повторе, чтобы повторный запуск шага не ломал состояние.
  1. Дедлайны и повторы.
  • Каждый шаг имеет таймаут ответа и стратегию повтора (например, 3 попытки с экспоненциальной паузой).
  • Если лимит попыток исчерпан — уходим в откат.
  1. Не смешивать бронь и окончательный шаг.
  • В деньгах — сначала авторизация (холд), списание — ближе к завершению.
  • В доставке — создание черновика и подтверждение после успешной оплаты.
  1. Чёткие статусы и неизменяемые логи.
  • Состояние саги хранится в отдельной таблице/хранилище.
  • Логи шагов неизменяемы: лучше добавить новую запись о «компенсации», чем переписывать историю.

Состояние, события и надёжная доставка (Outbox/Inbox)

Оркестрация требует сохранения состояния между перезапусками. Минимум — таблицы: «саги», «шаги», «аудит» и «аутбокс» для внешних событий. Аутбокс — это буфер событий, который записывается в одной локальной транзакции с изменением состояния, а публикуется асинхронно. Так мы не теряем событие и не публикуем «фантомы».

Для входящих событий в стиле хореографии полезен инбокс — учёт полученных идентификаторов, чтобы не обработать одно и то же дважды.

Наблюдаемость и безопасность

  • Корреляция. Каждый заказ (и сага) имеют общий correlation_id — он проходит через логи и трассировку.
  • Трассировка. Подключите OpenTelemetry: шаги/компенсации видны как спаны, удобно искать узкие места.
  • Метрики. «Доля успешно завершённых саг», «среднее время», «уровень откатов по шагу». Это быстро находит сбойные интеграции.
  • Безопасность. В события не кладём полные данные карт и персональные сведения; используем токены/идентификаторы.

Пример: мини‑оркестратор саги на Python + SQLite

Ниже — учебный пример оркестрации трёх шагов: склад, оплата, доставка. Скрипт хранит состояние в SQLite, умеет продолжать незавершённые саги, публикует события в локальный «аутбокс». Переменной окружения FAIL_STEP можно принудительно «уронить» любой шаг, чтобы увидеть компенсации.

Как запустить:

  1. Установите Python 3.10+.
  2. Сохраните код в файл saga.py и запустите:
  • Старт саги: python saga.py start --order 1001 --amount 2500 --address "ул. Пушкина, д. 1"
  • Резюмировать незавершённые: python saga.py resume
  • Принудительный сбой шага: FAIL_STEP=PAYMENT python saga.py start --order 1002 --amount 500 --address "Невский, 10"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import time
import sqlite3
from datetime import datetime

DB_PATH = 'saga.db'

def now_iso():
    return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

def with_conn(fn):
    def wrapper(*args, **kwargs):
        conn = sqlite3.connect(DB_PATH)
        conn.row_factory = sqlite3.Row
        try:
            res = fn(conn, *args, **kwargs)
            conn.commit()
            return res
        finally:
            conn.close()
    return wrapper

@with_conn
def init_db(conn):
    cur = conn.cursor()
    cur.execute("""
    CREATE TABLE IF NOT EXISTS sagas (
        id TEXT PRIMARY KEY,
        status TEXT NOT NULL,
        created_at TEXT NOT NULL,
        updated_at TEXT NOT NULL
    )
    """)
    cur.execute("""
    CREATE TABLE IF NOT EXISTS saga_steps (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        saga_id TEXT NOT NULL,
        name TEXT NOT NULL,
        status TEXT NOT NULL,
        order_num INTEGER NOT NULL,
        payload TEXT,
        created_at TEXT NOT NULL,
        updated_at TEXT NOT NULL,
        UNIQUE(saga_id, name)
    )
    """)
    cur.execute("""
    CREATE TABLE IF NOT EXISTS outbox (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        saga_id TEXT NOT NULL,
        event_type TEXT NOT NULL,
        payload TEXT NOT NULL,
        published INTEGER NOT NULL DEFAULT 0,
        created_at TEXT NOT NULL
    )
    """)

@with_conn
def create_saga(conn, saga_id):
    cur = conn.cursor()
    now = now_iso()
    cur.execute(
        'INSERT OR IGNORE INTO sagas (id, status, created_at, updated_at) VALUES (?, ?, ?, ?)',
        (saga_id, 'PENDING', now, now)
    )

@with_conn
def upsert_step(conn, saga_id, name, order_num, status, payload=None):
    cur = conn.cursor()
    now = now_iso()
    cur.execute('SELECT id FROM saga_steps WHERE saga_id=? AND name=?', (saga_id, name))
    row = cur.fetchone()
    if row is None:
        cur.execute(
            'INSERT INTO saga_steps (saga_id, name, status, order_num, payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)',
            (saga_id, name, status, order_num, json.dumps(payload) if payload else None, now, now)
        )
    else:
        cur.execute(
            'UPDATE saga_steps SET status=?, payload=?, updated_at=? WHERE saga_id=? AND name=?',
            (status, json.dumps(payload) if payload else None, now, saga_id, name)
        )

@with_conn
def update_saga_status(conn, saga_id, status):
    cur = conn.cursor()
    cur.execute('UPDATE sagas SET status=?, updated_at=? WHERE id=?', (status, now_iso(), saga_id))

@with_conn
def get_saga(conn, saga_id):
    cur = conn.cursor()
    cur.execute('SELECT * FROM sagas WHERE id=?', (saga_id,))
    return cur.fetchone()

@with_conn
def get_saga_steps(conn, saga_id):
    cur = conn.cursor()
    cur.execute('SELECT * FROM saga_steps WHERE saga_id=? ORDER BY order_num ASC', (saga_id,))
    return cur.fetchall()

@with_conn
def list_unfinished_sagas(conn):
    cur = conn.cursor()
    cur.execute("SELECT * FROM sagas WHERE status IN ('PENDING','COMPENSATING') ORDER BY created_at ASC")
    return cur.fetchall()

@with_conn
def add_outbox_event(conn, saga_id, event_type, payload):
    cur = conn.cursor()
    cur.execute(
        'INSERT INTO outbox (saga_id, event_type, payload, published, created_at) VALUES (?, ?, ?, 0, ?)',
        (saga_id, event_type, json.dumps(payload), now_iso())
    )

@with_conn
def publish_outbox(conn):
    cur = conn.cursor()
    cur.execute('SELECT * FROM outbox WHERE published=0 ORDER BY id ASC')
    rows = cur.fetchall()
    for r in rows:
        # Имитируем публикацию события во внешний мир
        print(f"[EVENT] {r['event_type']} saga={r['saga_id']} payload={r['payload']}")
        sys.stdout.flush()
        cur.execute('UPDATE outbox SET published=1 WHERE id=?', (r['id'],))

# ----- Имитация сервисов и компенсаций -----
FAIL_STEP = os.getenv('FAIL_STEP', '').upper().strip()

class StepError(Exception):
    pass

def ensure_ok(step_name):
    if FAIL_STEP == step_name:
        raise StepError(f"Принудительный сбой шага {step_name}")

# Склад

def reserve_inventory(ctx):
    ensure_ok('INVENTORY')
    time.sleep(0.2)
    reservation_id = f"res-{ctx['order_id']}"
    print(f"[INVENTORY] Зарезервирован товар: {reservation_id}")
    return {"reservation_id": reservation_id}

def release_inventory(ctx, step_payload):
    time.sleep(0.1)
    rid = step_payload['reservation_id']
    print(f"[INVENTORY] Отмена резерва: {rid}")

# Оплата

def charge_payment(ctx):
    ensure_ok('PAYMENT')
    time.sleep(0.3)
    payment_id = f"pay-{ctx['order_id']}"
    print(f"[PAYMENT] Списано {ctx['amount']} по платежу {payment_id}")
    return {"payment_id": payment_id, "amount": ctx['amount']}

def refund_payment(ctx, step_payload):
    time.sleep(0.1)
    pid = step_payload['payment_id']
    amt = step_payload['amount']
    print(f"[PAYMENT] Возврат {amt} по платежу {pid}")

# Доставка

def arrange_delivery(ctx):
    ensure_ok('DELIVERY')
    time.sleep(0.2)
    delivery_id = f"del-{ctx['order_id']}"
    print(f"[DELIVERY] Оформлена доставка {delivery_id} на адрес: {ctx['address']}")
    return {"delivery_id": delivery_id, "address": ctx['address']}

def cancel_delivery(ctx, step_payload):
    time.sleep(0.1)
    did = step_payload['delivery_id']
    print(f"[DELIVERY] Отмена доставки {did}")

STEPS = [
    {
        'name': 'ReserveInventory',
        'order': 1,
        'run': reserve_inventory,
        'compensate': release_inventory
    },
    {
        'name': 'ChargePayment',
        'order': 2,
        'run': charge_payment,
        'compensate': refund_payment
    },
    {
        'name': 'ArrangeDelivery',
        'order': 3,
        'run': arrange_delivery,
        'compensate': cancel_delivery
    }
]

def process_saga(saga_id, ctx):
    saga = get_saga(saga_id)
    if saga is None:
        raise RuntimeError('Сага не найдена')

    if saga['status'] not in ('PENDING', 'COMPENSATING'):
        print(f"[SAGA] {saga_id} уже в состоянии {saga['status']}")
        return

    steps = get_saga_steps(saga_id)
    steps_by_name = {s['name']: s for s in steps}

    # Выполнение вперёд
    if saga['status'] == 'PENDING':
        for st in STEPS:
            sname = st['name']
            existing = steps_by_name.get(sname)
            if existing and existing['status'] == 'DONE':
                continue
            try:
                print(f"[SAGA] Выполнение шага {sname}")
                payload = st['run'](ctx)
                upsert_step(saga_id, sname, st['order'], 'DONE', payload)
            except Exception as e:
                print(f"[SAGA] Шаг {sname} упал: {e}")
                upsert_step(saga_id, sname, st['order'], 'FAILED', {"error": str(e)})
                update_saga_status(saga_id, 'COMPENSATING')
                break

    # Компенсации при необходимости
    saga = get_saga(saga_id)
    if saga['status'] == 'COMPENSATING':
        steps = get_saga_steps(saga_id)
        # Только те, что успели DONE
        done_steps = [s for s in steps if s['status'] == 'DONE']
        for s in sorted(done_steps, key=lambda x: x['order_num'], reverse=True):
            st_def = next(x for x in STEPS if x['name'] == s['name'])
            print(f"[SAGA] Компенсация шага {s['name']}")
            pl = json.loads(s['payload']) if s['payload'] else {}
            try:
                st_def['compensate'](ctx, pl)
                upsert_step(saga_id, s['name'], s['order_num'], 'COMPENSATED', pl)
            except Exception as e:
                # В бою: алерт, парковка, ручная дообработка
                print(f"[SAGA] Ошибка компенсации {s['name']}: {e}")
                upsert_step(saga_id, s['name'], s['order_num'], 'COMPENSATION_FAILED', {"error": str(e)})
                # Оставляем в COMPENSATING для повторов
                publish_outbox()
                return
        update_saga_status(saga_id, 'FAILED')
        add_outbox_event(saga_id, 'OrderFailed', {"order_id": ctx['order_id']})
        publish_outbox()
        print(f"[SAGA] {saga_id} завершена с ошибкой и откатом")
        return

    # Если дошли сюда, значит все шаги DONE
    saga = get_saga(saga_id)
    if saga['status'] == 'PENDING':
        update_saga_status(saga_id, 'COMPLETED')
        add_outbox_event(saga_id, 'OrderCompleted', {"order_id": ctx['order_id']})
        publish_outbox()
        print(f"[SAGA] {saga_id} успешно завершена")


def start_saga(order_id, amount, address):
    saga_id = f"order-{order_id}"
    init_db()
    create_saga(saga_id)
    # Инициализируем записи шагов в PENDING для наглядности
    for st in STEPS:
        upsert_step(saga_id, st['name'], st['order'], 'PENDING', {})
    ctx = {"order_id": str(order_id), "amount": int(amount), "address": address}
    process_saga(saga_id, ctx)


def resume_all():
    init_db()
    rows = list_unfinished_sagas()
    if not rows:
        print("Незавершённых саг нет")
        return
    for r in rows:
        print(f"[SAGA] Резюме {r['id']} в статусе {r['status']}")
        # В минимальном примере контекст хранится в шагах
        steps = get_saga_steps(r['id'])
        # Восстановим минимальный контекст из данных шагов при наличии
        ctx = {"order_id": r['id'].split('order-')[-1], "amount": 0, "address": ''}
        for s in steps:
            if s['name'] == 'ChargePayment' and s['payload']:
                pl = json.loads(s['payload'])
                ctx['amount'] = pl.get('amount', 0)
            if s['name'] == 'ArrangeDelivery' and s['payload']:
                pl = json.loads(s['payload'])
                ctx['address'] = pl.get('address', '')
        process_saga(r['id'], ctx)


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Использование: python saga.py start --order 1001 --amount 2500 --address 'ул. Пушкина, 1' | resume")
        sys.exit(1)
    cmd = sys.argv[1]
    if cmd == 'start':
        # Примитивный парсер аргументов
        try:
            order = sys.argv[sys.argv.index('--order') + 1]
            amount = sys.argv[sys.argv.index('--amount') + 1]
            address = sys.argv[sys.argv.index('--address') + 1]
        except Exception:
            print("Нужно указать --order --amount --address")
            sys.exit(1)
        start_saga(order, amount, address)
    elif cmd == 'resume':
        resume_all()
    else:
        print('Неизвестная команда')

Что демонстрирует этот код:

  • Состояние шагов хранится и переживает рестарт процесса.
  • Компензирующие действия вызываются в обратном порядке.
  • События о завершении пишутся в «аутбокс» и публикуются отдельно — без потери.

Как это превратить в прод

  • Состояние — в отдельной базе с репликацией.
  • Публикация из аутбокса — в очередь/шину (Kafka, RabbitMQ), отдельным воркером.
  • Повторы шагов — с настраиваемыми интервалами, лимитами, джоб‑очередями.
  • Алерты — при COMPENSATION_FAILED, долгих саг, превышении доли откатов.

Хореография: когда без центрального оркестратора

Если домены независимы и допустима асинхронность, можно связать их событиями. При этом важно не терять и не дублировать сообщения. Минимальная схема «аутбокс/инбокс» в базе сервиса:

-- Outbox: пишется в одной транзакции с изменением бизнес-состояния
CREATE TABLE IF NOT EXISTS outbox (
  id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
  aggregate_id TEXT NOT NULL,
  event_type TEXT NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  published BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX IF NOT EXISTS idx_outbox_unpub ON outbox(published, id);

-- Inbox: учёт полученных событий, чтобы не обработать дважды
CREATE TABLE IF NOT EXISTS inbox (
  source TEXT NOT NULL,
  event_id TEXT NOT NULL,
  received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  PRIMARY KEY(source, event_id)
);

Дальше — процесс‑публикатор читает из outbox, шлёт в шину, помечает как опубликованные. Консьюмеры при обработке события сначала проверяют/пишут в inbox, затем выполняют бизнес‑логику; если запись уже есть — событие пропускается.

Чек‑лист на прод

  • Опишите шаги и компенсации письменно. Без этого будут «задние двери» и неучтённые стейты.
  • Определите дедлайны на уровне саги и каждого шага. Что происходит по таймауту?
  • Продумайте повторы, но не бесконечные. «Три попытки, затем компенсация и алерт» — хороший старт.
  • Состояние саги — в отдельном хранилище, с резервным копированием и мониторингом.
  • Включите трассировку: correlation_id обязателен в логах, метриках и событиях.
  • Компенсации должны быть безопасны при повторе. Проверьте это тестами.
  • Для денежных операций используйте холд и разделение capture/refund.
  • Проведите «chaos‑тесты»: выключите сервисы на шаге 2 и 3, измерьте время отката и полноту компенсаций.
  • Опишите ручные процедуры: как дооканчивать сагу руками, если интеграция внешнего поставщика «лежит» сутки.

Экономика решения и риски

Экономика:

  • Масштабирование без глобальной блокировки: пик заказов не кладёт транзакционный координатор.
  • Независимые релизы сервисов: меньше согласований, быстрее поставка фич.
  • Прозрачные риски: каждый шаг со своими метриками и целями по времени.

Риски:

  • Сложнее проектирование: нужно заранее описать компромиссы и компенсации.
  • Возможны «длинные хвосты» ожидания (доставка отвечает медленно) — спасают дедлайны и парковка проблемных саг.
  • Неправильно спроектированная компенсация превращает откат в новую аварию — тестируйте!

Итоги

Сага‑паттерн — практичный способ собрать «целостную» бизнес‑операцию из независимых сервисов без 2PC. Он требует дисциплины: явно описать шаги и компенсации, хранить состояние, обеспечивать доставку событий и трассировку. В обмен вы получаете масштабируемость, независимые релизы и предсказуемые SLA. Начать можно с оркестрации для критичных сценариев, а там, где это уместно, — перейти к хореографии с аутбоксом/инбоксом. Пример из статьи — рабочая основа, от которой легко двинуться к прод‑уровню с метриками, очередями и автоматическими повторами.


микросервисысагираспределённые транзакции