
TLS шифрует трафик и защищает от «подслушивания». Но он не доказывает, что отправитель — именно ваш партнёр или ваш собственный сервис. Любой, кто знает публичный URL вашего вебхука, может отправить запрос, похожий на настоящий. Это открывает путь к:
Подпись решает задачу подлинности и целостности сообщения: мы доказываем, что запрос сформирован отправителем, знающим секрет (или владеющим закрытым ключом), и что тело запроса не менялось по дороге.
Договоримся о заголовке с подписью. Простой и совместимый вариант — один заголовок Webhook-Signature с параметрами через запятую:
Пример:
Webhook-Signature: t=1719830400,kid=live-01,v1=Zb2w7iZbUq8pN1f1zXH1zHhU8v9q5EPL+o2tX1m7k/Q=
Где:
t — метка времени (в секундах UTC);kid — идентификатор ключа для ротации;v1 — подпись HMAC-SHA256 от строки ${t}.${raw_body} в Base64.Можно расширить протокол, добавив nonce для защиты от повторов и v2 для новой схемы подписи, не ломая старую.
Отправитель:
t (например, Unix time),str(t).encode() + b'.' + raw_body,Webhook-Signature.Получатель:
t, kid, v1,t попадает в окно допустимого времени (например, ±5 минут),kid находит активный секрет, пересчитывает HMAC и сравнивает с v1 константным сравнением (без утечек времени),nonce (если используется) и уникальность события.Любые изменения форматирования (пробелы, переносы строк, порядок полей) меняют байты, из которых делается подпись. Подписываем именно «сырое» тело. Если сериализация на стороне отправителя нестрогая — зафиксируйте правила (например, JSON.stringify без пробелов, сортировка ключей) и следуйте им.
Ниже — полностью рабочий пример. Он:
/webhook,Webhook-Signature,(timestamp, signature).# requirements: flask==3.0.0
# Запуск: python app.py
# Тест: curl -X POST http://127.0.0.1:5000/webhook -H "Content-Type: application/json" \
# -d '{"event":"ping"}' (без подписи вернёт 400)
import base64
import hmac
import hashlib
import time
from collections import OrderedDict
from typing import Dict
from flask import Flask, request, jsonify
app = Flask(__name__)
# Активные ключи (пример). В реальности храните в конфиге/секретах.
SECRETS: Dict[str, bytes] = {
"live-01": b"supersecret_live_01",
}
# Окно допустимой метки времени в секундах
TIMESTAMP_TOLERANCE = 300 # 5 минут
# Простейший LRU для защиты от повторов: сохраняем N последних подписей с TTL
class AntiReplay:
def __init__(self, capacity: int = 10000, ttl_seconds: int = 600):
self.capacity = capacity
self.ttl = ttl_seconds
self._store = OrderedDict()
def _evict(self):
# удаляем просроченные записи
now = time.time()
keys_to_delete = []
for k, (ts, _) in self._store.items():
if now - ts > self.ttl:
keys_to_delete.append(k)
else:
break # так как OrderedDict по времени добавления
for k in keys_to_delete:
self._store.pop(k, None)
# ограничение по размеру
while len(self._store) > self.capacity:
self._store.popitem(last=False)
def seen(self, key: str) -> bool:
self._evict()
if key in self._store:
return True
self._store[key] = (time.time(), True)
return False
anti_replay = AntiReplay()
def parse_signature_header(header: str):
parts = [p.strip() for p in header.split(',') if p.strip()]
data = {}
for p in parts:
if '=' in p:
k, v = p.split('=', 1)
data[k] = v
if 't' not in data or 'v1' not in data or 'kid' not in data:
raise ValueError('signature header missing fields')
try:
t = int(data['t'])
except ValueError:
raise ValueError('invalid t')
return t, data['kid'], data['v1']
def compute_hmac_base64(secret: bytes, timestamp: int, raw_body: bytes) -> str:
msg = str(timestamp).encode('utf-8') + b'.' + raw_body
mac = hmac.new(secret, msg, hashlib.sha256).digest()
return base64.b64encode(mac).decode('ascii')
def secure_compare(a: str, b: str) -> bool:
# сравнение без утечек времени
return hmac.compare_digest(a, b)
@app.post('/webhook')
def webhook():
raw_body = request.get_data(cache=False) # сырые байты
sig_header = request.headers.get('Webhook-Signature', '')
if not sig_header:
return jsonify(error='missing signature'), 400
try:
t, kid, v1 = parse_signature_header(sig_header)
except ValueError as e:
return jsonify(error=str(e)), 400
now = int(time.time())
if abs(now - t) > TIMESTAMP_TOLERANCE:
return jsonify(error='timestamp outside tolerance'), 400
secret = SECRETS.get(kid)
if not secret:
return jsonify(error='unknown key id'), 400
expected = compute_hmac_base64(secret, t, raw_body)
if not secure_compare(expected, v1):
return jsonify(error='invalid signature'), 400
# простая защита от повторов: не принимать одну и ту же (t,v1) дважды
replay_key = f"{t}:{v1}"
if anti_replay.seen(replay_key):
return jsonify(error='replay detected'), 409
# На этом этапе запрос аутентичен. Обработка события:
# Важно: используйте idempotency на уровне события (например, event_id).
# Здесь просто эхо-ответ для демонстрации.
return jsonify(status='ok', received_len=len(raw_body))
if __name__ == '__main__':
app.run(debug=True)
Для локальной проверки подписи удобно написать генератор подписи, имитирующий отправителя:
# Генерация заголовка подписи для теста
import base64, hmac, hashlib, time
secret = b"supersecret_live_01"
body = b'{"event":"ping"}'
t = int(time.time())
msg = str(t).encode() + b'.' + body
sig = base64.b64encode(hmac.new(secret, msg, hashlib.sha256).digest()).decode('ascii')
header = f"Webhook-Signature: t={t},kid=live-01,v1={sig}"
print(header)
Главная ловушка в Node.js — стандартные парсеры превращают тело в объект и теряют исходные байты. Нужен middleware, который сохраняет «сырое» тело.
// package.json: { "type": "module" }
// npm i express raw-body
import express from 'express';
import getRawBody from 'raw-body';
import crypto from 'crypto';
const app = express();
// Middleware: читаем сырые байты и сохраняем на req.rawBody
app.use(async (req, res, next) => {
try {
req.rawBody = await getRawBody(req);
next();
} catch (e) {
res.status(400).json({ error: 'invalid body' });
}
});
const SECRETS = { 'live-01': Buffer.from('supersecret_live_01', 'utf8') };
const TOLERANCE = 300; // 5 минут
function parseSignatureHeader(h) {
if (!h) throw new Error('missing header');
const parts = h.split(',').map(p => p.trim()).filter(Boolean);
const data = {};
for (const p of parts) {
const [k, v] = p.split('=');
data[k] = v;
}
const t = parseInt(data.t, 10);
if (!t || !data.v1 || !data.kid) throw new Error('invalid header');
return { t, v1: data.v1, kid: data.kid };
}
function computeHmacBase64(secret, t, rawBody) {
const msg = Buffer.concat([Buffer.from(String(t)), Buffer.from('.') , rawBody]);
const mac = crypto.createHmac('sha256', secret).update(msg).digest();
return mac.toString('base64');
}
function safeEqual(a, b) {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return crypto.timingSafeEqual(ab, bb);
}
const seen = new Set();
function isReplay(key) {
if (seen.has(key)) return true;
seen.add(key);
if (seen.size > 10000) {
// Простая эвакуация для примера (в проде используйте TTL/Redis)
const it = seen.values().next().value;
seen.delete(it);
}
return false;
}
app.post('/webhook', (req, res) => {
try {
const { t, v1, kid } = parseSignatureHeader(req.headers['webhook-signature']);
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - t) > TOLERANCE) return res.status(400).json({ error: 'timestamp' });
const secret = SECRETS[kid];
if (!secret) return res.status(400).json({ error: 'unknown kid' });
const expected = computeHmacBase64(secret, t, req.rawBody);
if (!safeEqual(expected, v1)) return res.status(400).json({ error: 'signature' });
const replayKey = `${t}:${v1}`;
if (isReplay(replayKey)) return res.status(409).json({ error: 'replay' });
return res.json({ status: 'ok', len: req.rawBody.length });
} catch (e) {
return res.status(400).json({ error: e.message });
}
});
app.listen(3000, () => console.log('listening on :3000'));
Метка времени ограничивает «срок жизни» подписи. Но если злоумышленник перехватил и тут же повторно отправил запрос — timestamp не поможет. Добавьте одноразовый идентификатор (nonce) или используйте связку (delivery_id, timestamp):
nonce или delivery_id и включает его в тело и/или заголовки (например, Delivery-Id: ...).Не путайте с идемпотентностью: даже уникальный delivery может описывать событие, которое уже обработано. Храните event_id и делайте обработку идемпотентной.
Рано или поздно ключи меняются. План:
kid — идентификатор ключа. Храните несколько секретов на приёме.kid). Старый держите активным, чтобы принимать старые доставки и ретраи.Версионирование подписи (например, v1/v2) пригодится, если изменится схема: иные алгоритмы, каноникализация, дополнительные поля. Пока поддерживайте обе, отдавая приоритет новой.
Content-Type: application/json; charset=utf-8 и реально используйте UTF‑8.nonce/delivery_id и хранилище недавних значений.Продвинутый вариант — вместе с подписью передавать Content-Digest по RFC (хеш тела). Это помогает отладке, но не заменяет подпись.
kid, окно времени, но не сам секрет и не значение подписи целиком. Допустимо логировать первые несколько символов подписи для трассировки.Соберите «набор примеров» (fixtures):
t, kid, секрет,Положите их в репозиторий рядом с кодом проверки и гоняйте как юнит‑тесты в CI. Это спасает от случайных изменений сериализации JSON и регрессий.
Если ко многим получателям надо транслировать вебхуки, или вы не хотите делиться секретом с каждым — рассмотрите асимметричные подписи (Ed25519, ECDSA):
Для большинства B2B‑интеграций стартуйте с HMAC: просто, быстро и достаточно безопасно при аккуратной реализации.
Webhook-Signature с t, kid, v1.${t}.${raw_body}; используем HMAC-SHA256 + Base64.delivery_id + хранилище недавних значений).Content-Type, Content-Length).kid, хранится несколько активных секретов.Подпись и проверка вебхуков — недорогая мера, которая закрывает реальные риски: подделки, непреднамеренные дубли и «шум» от нестабильных интеграций. Простая схема на HMAC, окно по времени и защита от повторов дают быстрый выигрыш: меньше инцидентов, меньше нагрузки на поддержку, выше доверие партнёров и спокойнее аудит. Начните с минимального формата и юнит‑тестов, а по мере роста добавьте ротацию ключей, версионирование и, при необходимости, асимметричную криптографию.