
Событийная шина (Kafka/Redpanda/Pulsar) ускоряет интеграции: одна команда публикует события, другие подписываются. Но без чёткого формата и правил изменений любое «мелкое» поле может уронить аналитику, расчёты и обновления балансов.
Схемы событий и реестр схем решают три реальные боли:
Итог — меньше инцидентов, быстрее релизы, ниже стоимость владения интеграциями.
Под «схемой» понимаем формальное описание структуры события: типы полей, обязательность, значения по умолчанию, версии. Популярные варианты:
Реестры схем (Schema Registry) хранят версии схем и проверяют совместимость:
Практика: для высоконагруженных топиков — Avro/Protobuf; для «аналитических» и низкой нагрузки — допустим JSON Schema, если важна читаемость.
Рекомендуем разделять «конверт» (метаданные) и payload (бизнес‑данные). Конверт одинаков для всех событий домена — упрощает трассировку и поддержку.
Пример конверта (JSON‑идея, но так же делаем и в Avro/Protobuf):
{
"event_name": "order.created",
"event_version": 2,
"event_id": "b4a7e0b0-4e1d-4cc1-9d9d-21e11a9c3f7a",
"occurred_at": "2026-03-10T12:34:56Z",
"producer": "orders-service",
"trace_id": "b0410f3a2a3b4c75",
"partition_key": "customer-123",
"payload": { /* бизнес-данные */ }
}
Минимум конверта:
Реестры поддерживают режимы совместимости:
Практика выбора:
Безопасные изменения (совместимы backward):
Опасные изменения (ломают совместимость):
Стратегия выката для Backward:
Версия 1 (payload для order.created):
{
"type": "record",
"name": "OrderCreatedV1",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
Версия 2 — добавили валюту и сделали её необязательной c дефолтом:
{
"type": "record",
"name": "OrderCreatedV2",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": ["null", "string"], "default": null}
]
}
Это backward‑совместимо: старые потребители игнорируют незнакомое поле; новые читают, когда оно появится.
Цель — не допустить в мастер/основную ветку схему, которая ломает совместимость с последней зарегистрированной в реестре.
Базовый процесс:
Пример скрипта совместимости (Python + REST API Schema Registry):
#!/usr/bin/env python3
import json
import sys
import requests
# Использование: python check_compat.py http://localhost:8081 com.example.orders.OrderCreated-value avro/orders/order_created_v2.avsc
if len(sys.argv) != 4:
print("Usage: check_compat.py <schema_registry_url> <subject> <schema_file>")
sys.exit(2)
base, subject, path = sys.argv[1], sys.argv[2], sys.argv[3]
with open(path, 'r', encoding='utf-8') as f:
schema_str = f.read()
payload = {"schema": schema_str}
url = f"{base}/compatibility/subjects/{subject}/versions/latest"
resp = requests.post(url, headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, data=json.dumps(payload))
if resp.status_code != 200:
print(f"Schema Registry error: {resp.status_code} {resp.text}")
sys.exit(3)
result = resp.json()
if result.get("is_compatible"):
print("OK: compatible with latest")
sys.exit(0)
else:
print("FAIL: incompatible with latest")
print(json.dumps(result, indent=2, ensure_ascii=False))
sys.exit(1)
В GitHub Actions добавьте шаг:
- name: Check Avro compatibility
run: |
python3 check_compat.py ${{ secrets.SCHEMA_REGISTRY_URL }} com.example.orders.OrderCreated-value avro/orders/order_created_v2.avsc
Где subject — это имя в реестре. Распространённые стратегии именования subject:
Ниже — минимально‑жизнеспособный пример производителя и потребителя с Avro и реестром схем (Confluent/Redpanda). Требуется пакет confluent-kafka>=2.1.
Сохраняем как avro/orders/order_created_v2.avsc:
{
"type": "record",
"name": "OrderCreatedV2",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": ["null", "string"], "default": null}
]
}
from datetime import datetime, timezone
import uuid
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
SCHEMA_REGISTRY_URL = "http://localhost:8081"
BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "orders.events"
SUBJECT = "com.example.orders.OrderCreated-value"
schema_registry_conf = {"url": SCHEMA_REGISTRY_URL}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Загрузим схему из файла
with open("avro/orders/order_created_v2.avsc", "r", encoding="utf-8") as f:
avro_schema_str = f.read()
avro_serializer = AvroSerializer(schema_registry_client, avro_schema_str)
producer_conf = {
"bootstrap.servers": BOOTSTRAP_SERVERS,
"key.serializer": StringSerializer("utf_8"),
"value.serializer": avro_serializer,
"enable.idempotence": True,
}
producer = SerializingProducer(producer_conf)
payload = {
"order_id": str(uuid.uuid4()),
"customer_id": "customer-123",
"amount": 1499.0,
"currency": "RUB"
}
envelope = {
"event_name": "order.created",
"event_version": 2,
"event_id": str(uuid.uuid4()),
"occurred_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"producer": "orders-service",
"trace_id": uuid.uuid4().hex[:16],
"partition_key": payload["customer_id"],
"payload": payload
}
producer.produce(topic=TOPIC, key=envelope["partition_key"], value=envelope["payload"], on_delivery=lambda err, msg: print(
f"delivered to {msg.topic()}[{msg.partition()}]@{msg.offset()}" if err is None else f"delivery error: {err}"
))
producer.flush(5)
Здесь мы сериализуем только payload Avro‑схемой. Конверт можно сериализовать как JSON (ключом) или вынести в отдельную схему — выбирайте единый подход в компании.
from confluent_kafka import DeserializingConsumer
from confluent_kafka.serialization import StringDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
SCHEMA_REGISTRY_URL = "http://localhost:8081"
BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "orders.events"
schema_registry_client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})
avro_deserializer = AvroDeserializer(schema_registry_client)
conf = {
"bootstrap.servers": BOOTSTRAP_SERVERS,
"group.id": "billing-service-v1",
"auto.offset.reset": "earliest",
"key.deserializer": StringDeserializer("utf_8"),
"value.deserializer": avro_deserializer,
}
consumer = DeserializingConsumer(conf)
consumer.subscribe([TOPIC])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"error: {msg.error()}")
continue
payload = msg.value() # dict после Avro десериализации
# Минимальная валидация бизнес‑правил
if not payload.get("order_id"):
print("skip: empty order_id")
consumer.commit(msg)
continue
amount = payload.get("amount", 0)
if amount < 0:
print("alert: negative amount")
# Обработка...
print(f"order {payload['order_id']} amount={amount}")
consumer.commit(msg)
finally:
consumer.close()
Чтобы быстро ловить проблемы совместимости и данных, добавьте:
Логируйте ключевые поля конверта: event_name, version, trace_id, producer. Это сильно ускоряет поиск виновника.
Примерные затраты:
Экономия: меньше инцидентов на интеграциях, короче MTTR, быстрее онбординг новых потребителей. Для компаний с десятками топиков окупается за 1–2 квартала за счёт снижения времени согласований и откатов.
Итог: схемы событий и реестр — это контракт между командами. Они позволяют добавлять поля без паники, ловить несовместимость заранее и выпускать фичи быстрее. Начните с одного домена, включите проверку в CI — и вы почувствуете разницу уже через первый релиз.