
Вебхуки — это канал, через который ваш продукт «сообщает миру» о событиях: оплата прошла, счёт выставлен, файл обработан, статус заказа изменился. Когда вебхуки ненадёжны, ваш партнёр теряет транзакции, ломается автоматизация, срываются SLA и растут издержки саппорта. Надёжные вебхуки приносят прямую выгоду:
Типовые причины сбоев: временные сетевые ошибки, проблемы на стороне получателя, долгие таймауты, отсутствие подписи и аутентификации, потеря событий из-за незафиксированной модели хранения.
Надёжная доставка — это не «один POST-запрос». Это отдельный сервис (или чётко выделенный модуль) со своими данными и процессами:
Главные правила:
Советуем зафиксировать компактный, но стабильный формат. Минимум полей:
Пример полезной нагрузки:
{
"id": "f1a7b214-2a4f-4d0e-8c1f-4d2c80a1f7a9",
"type": "invoice.paid",
"version": 1,
"occurred_at": "2026-04-15T10:12:34Z",
"data": {
"invoice_id": "inv_12345",
"amount": 9900,
"currency": "RUB",
"customer_id": "cust_6789"
}
}
Версионирование:
TLS — обязательно. Плюс подпись тела запроса и защита от повторного воспроизведения.
Рекомендуемый формат заголовка подписи:
${timestamp}.${rawBody}))>Преимущества:
Пример проверки подписи на Node.js:
// npm i crypto-safe-compare (или используйте тайминг-безопасное сравнение самостоятельно)
import crypto from 'crypto';
function timingSafeEqual(a, b) {
const bufA = Buffer.from(a, 'utf8');
const bufB = Buffer.from(b, 'utf8');
if (bufA.length !== bufB.length) return false;
return crypto.timingSafeEqual(bufA, bufB);
}
export function verifyWebhookSignature(headers, rawBody, secret, toleranceSec = 300) {
const sigHeader = headers['x-webhook-signature'] || headers['X-Webhook-Signature'];
const tsHeader = headers['x-webhook-timestamp'] || headers['X-Webhook-Timestamp'];
if (!sigHeader || !tsHeader) return false;
const timestamp = parseInt(tsHeader, 10);
if (!Number.isFinite(timestamp)) return false;
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - timestamp) > toleranceSec) return false; // защита от повторов
const expected = crypto
.createHmac('sha256', secret)
.update(`${timestamp}.${rawBody}`)
.digest('hex');
// Поддерживаем формат t=...,v1=...
const parts = sigHeader.split(',').reduce((acc, part) => {
const [k, v] = part.split('=');
acc[k.trim()] = (v || '').trim();
return acc;
}, {});
const provided = parts.v1 || sigHeader.trim();
return timingSafeEqual(expected, provided);
}
Ротация секретов:
Повторы неизбежны. Важно:
Пример расчёта задержки: delay = min(base * 2^(attempt-1) + random(0..base), maxBackoff). Базу выбирайте 5–10 секунд. Максимум — 15–30 минут. Общее окно повторов — не дольше 24–48 часов (зависит от сценария).
Чтобы повтор не привёл к повторной обработке, получатель должен хранить «уже виденные» X-Webhook-Id хотя бы несколько дней. Минимальная логика:
Отправитель помогает:
Сервис не должен бесконечно бить в мёртвую интеграцию:
Реплей нужен для интеграционных отладок и восстановлений после инцидента:
Что мерить:
Логи:
Трассировка:
Ниже — простая схема на PostgreSQL и асинхронный воркер на Python. Это базовый скелет, который можно расширять.
-- События
CREATE TABLE events (
id UUID PRIMARY KEY,
type TEXT NOT NULL,
version INT NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
payload JSONB NOT NULL
);
-- Конечные точки (подписки)
CREATE TABLE endpoints (
id UUID PRIMARY KEY,
url TEXT NOT NULL,
secret TEXT NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
max_attempts INT NOT NULL DEFAULT 12,
base_backoff_seconds INT NOT NULL DEFAULT 10,
max_backoff_seconds INT NOT NULL DEFAULT 900,
suspended BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Задания доставки
CREATE TABLE deliveries (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL REFERENCES events(id) ON DELETE CASCADE,
endpoint_id UUID NOT NULL REFERENCES endpoints(id) ON DELETE CASCADE,
attempt INT NOT NULL DEFAULT 0,
status TEXT NOT NULL CHECK (status IN ('pending','in_progress','sent','failed','deadletter')),
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_error TEXT,
response_status INT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_deliveries_pending ON deliveries (status, next_attempt_at);
CREATE INDEX idx_deliveries_endpoint ON deliveries (endpoint_id);
-- Не даём создать вторую «успешную» доставку для той же пары
CREATE UNIQUE INDEX uniq_delivered_once ON deliveries (event_id, endpoint_id)
WHERE status = 'sent';
# Python 3.11+
# pip install aiohttp asyncpg
import asyncio
import json
import os
import random
import time
import hmac
import hashlib
from typing import Any
import asyncpg
import aiohttp
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://user:pass@localhost:5432/webhooks")
CONCURRENCY = int(os.getenv("CONCURRENCY", "20"))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "100"))
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "10")) # сек
SIGNATURE_HEADER = "X-Webhook-Signature"
TIMESTAMP_HEADER = "X-Webhook-Timestamp"
ID_HEADER = "X-Webhook-Id"
REPLAY_HEADER = "X-Webhook-Replay"
async def compute_signature(secret: str, timestamp: int, raw_body: bytes) -> str:
mac = hmac.new(secret.encode(), f"{timestamp}.".encode() + raw_body, hashlib.sha256)
return mac.hexdigest()
def next_backoff(base: int, attempt: int, max_backoff: int) -> int:
# экспонента + равномерный джиттер в пределах base
delay = min(base * (2 ** max(0, attempt - 1)) + random.randint(0, base), max_backoff)
return delay
async def fetch_jobs(conn: asyncpg.Connection, limit: int):
# Блокируем задачи, которые пора выполнять. SKIP LOCKED — без гонок между воркерами.
rows = await conn.fetch(
"""
UPDATE deliveries d
SET status = 'in_progress', updated_at = now()
WHERE d.id IN (
SELECT id FROM deliveries
WHERE status = 'pending' AND next_attempt_at <= now()
ORDER BY next_attempt_at
FOR UPDATE SKIP LOCKED
LIMIT $1
)
RETURNING d.*
""",
limit,
)
return rows
async def send_one(session: aiohttp.ClientSession, conn: asyncpg.Connection, job: asyncpg.Record):
# Загружаем событие и подписку
event = await conn.fetchrow("SELECT * FROM events WHERE id=$1", job["event_id"]) # type: ignore
endpoint = await conn.fetchrow("SELECT * FROM endpoints WHERE id=$1", job["endpoint_id"]) # type: ignore
if not endpoint["active"] or endpoint["suspended"]:
await conn.execute(
"UPDATE deliveries SET status='failed', last_error=$1, updated_at=now() WHERE id=$2",
"endpoint inactive or suspended",
job["id"],
)
return
raw_body = json.dumps({
"id": str(event["id"]),
"type": event["type"],
"version": event["version"],
"occurred_at": event["occurred_at"].isoformat().replace("+00:00", "Z"),
"data": event["payload"],
}, separators=(",", ":")).encode()
ts = int(time.time())
sig = await compute_signature(endpoint["secret"], ts, raw_body)
url = endpoint["url"]
try:
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
async with session.post(
url,
data=raw_body,
headers={
"Content-Type": "application/json",
ID_HEADER: str(event["id"]),
TIMESTAMP_HEADER: str(ts),
SIGNATURE_HEADER: f"t={ts},v1={sig}",
REPLAY_HEADER: "false",
},
timeout=timeout,
) as resp:
status = resp.status
text = await resp.text()
except Exception as e: # сетевые/таймауты
await schedule_retry(conn, job, endpoint, f"network_error: {e}")
return
# Обработка кода ответа
if 200 <= status < 300:
await conn.execute(
"UPDATE deliveries SET status='sent', response_status=$1, updated_at=now() WHERE id=$2",
status,
job["id"],
)
return
if status in (401, 403, 410):
# Не авторизовано/секрет не совпадает/интеграция удалена — не мучаем повторно
await conn.execute(
"UPDATE deliveries SET status='deadletter', response_status=$1, last_error=$2, updated_at=now() WHERE id=$3",
status,
f"fatal_status_{status}",
job["id"],
)
# Можно также пометить endpoint.suspended = true для 410
if status == 410:
await conn.execute("UPDATE endpoints SET suspended=true WHERE id=$1", endpoint["id"]) # type: ignore
return
await schedule_retry(conn, job, endpoint, f"http_{status}: {text[:512]}", status)
async def schedule_retry(conn: asyncpg.Connection, job: asyncpg.Record, endpoint: asyncpg.Record, error_msg: str, status: int | None = None):
attempt = job["attempt"] + 1
max_attempts = endpoint["max_attempts"]
base = endpoint["base_backoff_seconds"]
max_b = endpoint["max_backoff_seconds"]
if attempt >= max_attempts:
await conn.execute(
"UPDATE deliveries SET status='deadletter', attempt=$1, response_status=$2, last_error=$3, updated_at=now() WHERE id=$4",
attempt,
status,
error_msg,
job["id"],
)
return
delay = next_backoff(base, attempt, max_b)
await conn.execute(
"UPDATE deliveries SET status='pending', attempt=$1, next_attempt_at=now() + make_interval(secs => $2), last_error=$3, response_status=$4, updated_at=now() WHERE id=$5",
attempt,
delay,
error_msg,
status,
job["id"],
)
async def worker(pool: asyncpg.Pool):
async with aiohttp.ClientSession() as session:
while True:
async with pool.acquire() as conn:
jobs = await fetch_jobs(conn, BATCH_SIZE)
if not jobs:
await asyncio.sleep(0.5)
continue
# Параллельно обрабатываем пачку
async with pool.acquire() as conn:
await asyncio.gather(*(send_one(session, conn, j) for j in jobs))
async def main():
pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=max(2, CONCURRENCY//2))
tasks = [asyncio.create_task(worker(pool)) for _ in range(CONCURRENCY)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Что осталось добавить в проде:
Итог: когда вебхуки становятся «первоклассным сервисом», интеграции ускоряются, саппорт дышит свободнее, а доверие партнёров растёт. Это одна из тех инвестиций в платформу, которые быстро отбиваются снижением издержек и ускорением продаж.