
• Оглавление
Пример из e‑commerce: покупатель оформляет заказ. Чтобы «заказ создан» стало правдой, нужно:
Три независимые системы, три отдельных канала отказов. Бизнесу нужна целостность: либо всё состоялось, либо корректный откат. Но тянуть глобальную транзакцию через несколько сервисов и баз — дорого, медленно и хрупко.
Двухфазная фиксация (2PC) и распределённые транзакции дают «всё или ничего», но:
Сага решает задачу по‑другому: не пытается блокировать мир, а раскладывает длинную операцию на последовательность локальных действий со своими «компенсирующими» шажками на случай отката.
Сага — это сценарий из шагов. Каждый шаг — обычная локальная транзакция в своём сервисе. Если какой‑то шаг не удался, уже выполненные отменяются компенсационными действиями.
Есть два стиля:
Для критичных денег и контрактных SLA чаще удобна оркестрация. Для «лёгких» сценариев интеграции между независимыми доменами — хореография.
Как спроектировать корректную сагу:
Оркестрация требует сохранения состояния между перезапусками. Минимум — таблицы: «саги», «шаги», «аудит» и «аутбокс» для внешних событий. Аутбокс — это буфер событий, который записывается в одной локальной транзакции с изменением состояния, а публикуется асинхронно. Так мы не теряем событие и не публикуем «фантомы».
Для входящих событий в стиле хореографии полезен инбокс — учёт полученных идентификаторов, чтобы не обработать одно и то же дважды.
Ниже — учебный пример оркестрации трёх шагов: склад, оплата, доставка. Скрипт хранит состояние в SQLite, умеет продолжать незавершённые саги, публикует события в локальный «аутбокс». Переменной окружения FAIL_STEP можно принудительно «уронить» любой шаг, чтобы увидеть компенсации.
Как запустить:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import time
import sqlite3
from datetime import datetime
DB_PATH = 'saga.db'
def now_iso():
return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
def with_conn(fn):
def wrapper(*args, **kwargs):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
res = fn(conn, *args, **kwargs)
conn.commit()
return res
finally:
conn.close()
return wrapper
@with_conn
def init_db(conn):
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS sagas (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS saga_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
saga_id TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT NOT NULL,
order_num INTEGER NOT NULL,
payload TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(saga_id, name)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
saga_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
published INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
)
""")
@with_conn
def create_saga(conn, saga_id):
cur = conn.cursor()
now = now_iso()
cur.execute(
'INSERT OR IGNORE INTO sagas (id, status, created_at, updated_at) VALUES (?, ?, ?, ?)',
(saga_id, 'PENDING', now, now)
)
@with_conn
def upsert_step(conn, saga_id, name, order_num, status, payload=None):
cur = conn.cursor()
now = now_iso()
cur.execute('SELECT id FROM saga_steps WHERE saga_id=? AND name=?', (saga_id, name))
row = cur.fetchone()
if row is None:
cur.execute(
'INSERT INTO saga_steps (saga_id, name, status, order_num, payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)',
(saga_id, name, status, order_num, json.dumps(payload) if payload else None, now, now)
)
else:
cur.execute(
'UPDATE saga_steps SET status=?, payload=?, updated_at=? WHERE saga_id=? AND name=?',
(status, json.dumps(payload) if payload else None, now, saga_id, name)
)
@with_conn
def update_saga_status(conn, saga_id, status):
cur = conn.cursor()
cur.execute('UPDATE sagas SET status=?, updated_at=? WHERE id=?', (status, now_iso(), saga_id))
@with_conn
def get_saga(conn, saga_id):
cur = conn.cursor()
cur.execute('SELECT * FROM sagas WHERE id=?', (saga_id,))
return cur.fetchone()
@with_conn
def get_saga_steps(conn, saga_id):
cur = conn.cursor()
cur.execute('SELECT * FROM saga_steps WHERE saga_id=? ORDER BY order_num ASC', (saga_id,))
return cur.fetchall()
@with_conn
def list_unfinished_sagas(conn):
cur = conn.cursor()
cur.execute("SELECT * FROM sagas WHERE status IN ('PENDING','COMPENSATING') ORDER BY created_at ASC")
return cur.fetchall()
@with_conn
def add_outbox_event(conn, saga_id, event_type, payload):
cur = conn.cursor()
cur.execute(
'INSERT INTO outbox (saga_id, event_type, payload, published, created_at) VALUES (?, ?, ?, 0, ?)',
(saga_id, event_type, json.dumps(payload), now_iso())
)
@with_conn
def publish_outbox(conn):
cur = conn.cursor()
cur.execute('SELECT * FROM outbox WHERE published=0 ORDER BY id ASC')
rows = cur.fetchall()
for r in rows:
# Имитируем публикацию события во внешний мир
print(f"[EVENT] {r['event_type']} saga={r['saga_id']} payload={r['payload']}")
sys.stdout.flush()
cur.execute('UPDATE outbox SET published=1 WHERE id=?', (r['id'],))
# ----- Имитация сервисов и компенсаций -----
FAIL_STEP = os.getenv('FAIL_STEP', '').upper().strip()
class StepError(Exception):
pass
def ensure_ok(step_name):
if FAIL_STEP == step_name:
raise StepError(f"Принудительный сбой шага {step_name}")
# Склад
def reserve_inventory(ctx):
ensure_ok('INVENTORY')
time.sleep(0.2)
reservation_id = f"res-{ctx['order_id']}"
print(f"[INVENTORY] Зарезервирован товар: {reservation_id}")
return {"reservation_id": reservation_id}
def release_inventory(ctx, step_payload):
time.sleep(0.1)
rid = step_payload['reservation_id']
print(f"[INVENTORY] Отмена резерва: {rid}")
# Оплата
def charge_payment(ctx):
ensure_ok('PAYMENT')
time.sleep(0.3)
payment_id = f"pay-{ctx['order_id']}"
print(f"[PAYMENT] Списано {ctx['amount']} по платежу {payment_id}")
return {"payment_id": payment_id, "amount": ctx['amount']}
def refund_payment(ctx, step_payload):
time.sleep(0.1)
pid = step_payload['payment_id']
amt = step_payload['amount']
print(f"[PAYMENT] Возврат {amt} по платежу {pid}")
# Доставка
def arrange_delivery(ctx):
ensure_ok('DELIVERY')
time.sleep(0.2)
delivery_id = f"del-{ctx['order_id']}"
print(f"[DELIVERY] Оформлена доставка {delivery_id} на адрес: {ctx['address']}")
return {"delivery_id": delivery_id, "address": ctx['address']}
def cancel_delivery(ctx, step_payload):
time.sleep(0.1)
did = step_payload['delivery_id']
print(f"[DELIVERY] Отмена доставки {did}")
STEPS = [
{
'name': 'ReserveInventory',
'order': 1,
'run': reserve_inventory,
'compensate': release_inventory
},
{
'name': 'ChargePayment',
'order': 2,
'run': charge_payment,
'compensate': refund_payment
},
{
'name': 'ArrangeDelivery',
'order': 3,
'run': arrange_delivery,
'compensate': cancel_delivery
}
]
def process_saga(saga_id, ctx):
saga = get_saga(saga_id)
if saga is None:
raise RuntimeError('Сага не найдена')
if saga['status'] not in ('PENDING', 'COMPENSATING'):
print(f"[SAGA] {saga_id} уже в состоянии {saga['status']}")
return
steps = get_saga_steps(saga_id)
steps_by_name = {s['name']: s for s in steps}
# Выполнение вперёд
if saga['status'] == 'PENDING':
for st in STEPS:
sname = st['name']
existing = steps_by_name.get(sname)
if existing and existing['status'] == 'DONE':
continue
try:
print(f"[SAGA] Выполнение шага {sname}")
payload = st['run'](ctx)
upsert_step(saga_id, sname, st['order'], 'DONE', payload)
except Exception as e:
print(f"[SAGA] Шаг {sname} упал: {e}")
upsert_step(saga_id, sname, st['order'], 'FAILED', {"error": str(e)})
update_saga_status(saga_id, 'COMPENSATING')
break
# Компенсации при необходимости
saga = get_saga(saga_id)
if saga['status'] == 'COMPENSATING':
steps = get_saga_steps(saga_id)
# Только те, что успели DONE
done_steps = [s for s in steps if s['status'] == 'DONE']
for s in sorted(done_steps, key=lambda x: x['order_num'], reverse=True):
st_def = next(x for x in STEPS if x['name'] == s['name'])
print(f"[SAGA] Компенсация шага {s['name']}")
pl = json.loads(s['payload']) if s['payload'] else {}
try:
st_def['compensate'](ctx, pl)
upsert_step(saga_id, s['name'], s['order_num'], 'COMPENSATED', pl)
except Exception as e:
# В бою: алерт, парковка, ручная дообработка
print(f"[SAGA] Ошибка компенсации {s['name']}: {e}")
upsert_step(saga_id, s['name'], s['order_num'], 'COMPENSATION_FAILED', {"error": str(e)})
# Оставляем в COMPENSATING для повторов
publish_outbox()
return
update_saga_status(saga_id, 'FAILED')
add_outbox_event(saga_id, 'OrderFailed', {"order_id": ctx['order_id']})
publish_outbox()
print(f"[SAGA] {saga_id} завершена с ошибкой и откатом")
return
# Если дошли сюда, значит все шаги DONE
saga = get_saga(saga_id)
if saga['status'] == 'PENDING':
update_saga_status(saga_id, 'COMPLETED')
add_outbox_event(saga_id, 'OrderCompleted', {"order_id": ctx['order_id']})
publish_outbox()
print(f"[SAGA] {saga_id} успешно завершена")
def start_saga(order_id, amount, address):
saga_id = f"order-{order_id}"
init_db()
create_saga(saga_id)
# Инициализируем записи шагов в PENDING для наглядности
for st in STEPS:
upsert_step(saga_id, st['name'], st['order'], 'PENDING', {})
ctx = {"order_id": str(order_id), "amount": int(amount), "address": address}
process_saga(saga_id, ctx)
def resume_all():
init_db()
rows = list_unfinished_sagas()
if not rows:
print("Незавершённых саг нет")
return
for r in rows:
print(f"[SAGA] Резюме {r['id']} в статусе {r['status']}")
# В минимальном примере контекст хранится в шагах
steps = get_saga_steps(r['id'])
# Восстановим минимальный контекст из данных шагов при наличии
ctx = {"order_id": r['id'].split('order-')[-1], "amount": 0, "address": ''}
for s in steps:
if s['name'] == 'ChargePayment' and s['payload']:
pl = json.loads(s['payload'])
ctx['amount'] = pl.get('amount', 0)
if s['name'] == 'ArrangeDelivery' and s['payload']:
pl = json.loads(s['payload'])
ctx['address'] = pl.get('address', '')
process_saga(r['id'], ctx)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Использование: python saga.py start --order 1001 --amount 2500 --address 'ул. Пушкина, 1' | resume")
sys.exit(1)
cmd = sys.argv[1]
if cmd == 'start':
# Примитивный парсер аргументов
try:
order = sys.argv[sys.argv.index('--order') + 1]
amount = sys.argv[sys.argv.index('--amount') + 1]
address = sys.argv[sys.argv.index('--address') + 1]
except Exception:
print("Нужно указать --order --amount --address")
sys.exit(1)
start_saga(order, amount, address)
elif cmd == 'resume':
resume_all()
else:
print('Неизвестная команда')
Что демонстрирует этот код:
Если домены независимы и допустима асинхронность, можно связать их событиями. При этом важно не терять и не дублировать сообщения. Минимальная схема «аутбокс/инбокс» в базе сервиса:
-- Outbox: пишется в одной транзакции с изменением бизнес-состояния
CREATE TABLE IF NOT EXISTS outbox (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX IF NOT EXISTS idx_outbox_unpub ON outbox(published, id);
-- Inbox: учёт полученных событий, чтобы не обработать дважды
CREATE TABLE IF NOT EXISTS inbox (
source TEXT NOT NULL,
event_id TEXT NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(source, event_id)
);
Дальше — процесс‑публикатор читает из outbox, шлёт в шину, помечает как опубликованные. Консьюмеры при обработке события сначала проверяют/пишут в inbox, затем выполняют бизнес‑логику; если запись уже есть — событие пропускается.
Экономика:
Риски:
Сага‑паттерн — практичный способ собрать «целостную» бизнес‑операцию из независимых сервисов без 2PC. Он требует дисциплины: явно описать шаги и компенсации, хранить состояние, обеспечивать доставку событий и трассировку. В обмен вы получаете масштабируемость, независимые релизы и предсказуемые SLA. Начать можно с оркестрации для критичных сценариев, а там, где это уместно, — перейти к хореографии с аутбоксом/инбоксом. Пример из статьи — рабочая основа, от которой легко двинуться к прод‑уровню с метриками, очередями и автоматическими повторами.