
Обычный журнал — это просто строки в таблице. Их можно изменить или удалить. Решаем это криптографией: связываем записи в цепочку хешей. Если где‑то «дырка» или подмена — проверка сразу выдаст несоответствие.
Идея простая:
Термины без перегруза:
SHA‑256.Ниже готовая схема и функции. Достаточно выполнить в PostgreSQL. Используем расширение pgcrypto для digest().
-- Подготовка
CREATE SCHEMA IF NOT EXISTS audit;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Основной журнал: только добавление
CREATE TABLE IF NOT EXISTS audit.audit_log (
id BIGSERIAL PRIMARY KEY,
stream TEXT NOT NULL, -- имя потока (например, tenant_42)
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
actor TEXT NOT NULL, -- кто
action TEXT NOT NULL, -- что сделал
payload JSONB NOT NULL DEFAULT '{}'::jsonb, -- детали
prev_hash BYTEA NOT NULL,
curr_hash BYTEA NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_audit_log_stream_ts ON audit.audit_log (stream, ts);
CREATE INDEX IF NOT EXISTS ix_audit_log_payload_gin ON audit.audit_log USING GIN (payload);
-- Текущее состояние каждой цепочки (последний хеш)
CREATE TABLE IF NOT EXISTS audit.audit_stream (
stream TEXT PRIMARY KEY,
last_id BIGINT,
last_hash BYTEA NOT NULL
);
-- Запрещаем UPDATE/DELETE по журналу (append-only)
CREATE OR REPLACE FUNCTION audit.block_changes()
RETURNS trigger AS $$
BEGIN
RAISE EXCEPTION 'audit_log is append-only';
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS audit_log_no_update ON audit.audit_log;
CREATE TRIGGER audit_log_no_update
BEFORE UPDATE OR DELETE ON audit.audit_log
FOR EACH ROW EXECUTE FUNCTION audit.block_changes();
-- Роли: писатели и читатели
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'audit_writer') THEN
CREATE ROLE audit_writer NOINHERIT;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'audit_reader') THEN
CREATE ROLE audit_reader NOINHERIT;
END IF;
END$$;
REVOKE ALL ON audit.audit_log FROM PUBLIC;
REVOKE ALL ON audit.audit_stream FROM PUBLIC;
GRANT SELECT ON audit.audit_log TO audit_reader;
GRANT SELECT ON audit.audit_stream TO audit_reader;
-- Добавление записи в цепочку с защитой от гонок по потоку
CREATE OR REPLACE FUNCTION audit.append_event(
p_stream TEXT,
p_actor TEXT,
p_action TEXT,
p_payload JSONB DEFAULT '{}'::jsonb
) RETURNS audit.audit_log AS $$
DECLARE
v_prev BYTEA;
v_ts TIMESTAMPTZ := clock_timestamp();
v_curr BYTEA;
v_id BIGINT;
v_data BYTEA;
v_exists BOOLEAN := TRUE;
BEGIN
-- Лок по потоку на время транзакции (минимум блокировок между потоками)
PERFORM pg_advisory_xact_lock(hashtext('audit_stream:' || p_stream));
-- Берём последний хеш; если потока ещё нет — создаём генезис
SELECT last_hash INTO v_prev FROM audit.audit_stream WHERE stream = p_stream FOR UPDATE;
IF NOT FOUND THEN
v_prev := digest('GENESIS:' || p_stream, 'sha256');
v_exists := FALSE;
END IF;
-- Собираем данные в байты для стабильного хеширования
-- Важно: фиксируем точный формат времени, чтобы верификация вне БД совпадала
v_data := v_prev
|| convert_to(to_char(v_ts, 'YYYY-MM-DD"T"HH24:MI:SS.MS TZH:TZM'), 'UTF8')
|| convert_to(coalesce(p_actor, ''), 'UTF8')
|| convert_to(coalesce(p_action, ''), 'UTF8')
|| convert_to(coalesce(p_payload::text, '{}'), 'UTF8');
v_curr := digest(v_data, 'sha256');
INSERT INTO audit.audit_log(stream, ts, actor, action, payload, prev_hash, curr_hash)
VALUES (p_stream, v_ts, p_actor, p_action, p_payload, v_prev, v_curr)
RETURNING id INTO v_id;
IF v_exists THEN
UPDATE audit.audit_stream SET last_hash = v_curr, last_id = v_id WHERE stream = p_stream;
ELSE
INSERT INTO audit.audit_stream(stream, last_id, last_hash) VALUES (p_stream, v_id, v_curr);
END IF;
RETURN (SELECT l FROM audit.audit_log l WHERE l.id = v_id);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- Ограничим прямую запись в таблицу: только через функцию
REVOKE INSERT, UPDATE, DELETE ON audit.audit_log FROM PUBLIC;
GRANT EXECUTE ON FUNCTION audit.append_event(TEXT, TEXT, TEXT, JSONB) TO audit_writer;
-- Таблица «якорей» — периодическая фиксация кончика цепочки
CREATE TABLE IF NOT EXISTS audit.audit_anchor (
id BIGSERIAL PRIMARY KEY,
stream TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
last_id BIGINT NOT NULL,
last_hash BYTEA NOT NULL,
note TEXT
);
-- Функция записи якоря (например, по расписанию)
CREATE OR REPLACE FUNCTION audit.write_anchor(p_stream TEXT, p_note TEXT DEFAULT NULL)
RETURNS audit.audit_anchor AS $$
DECLARE
r audit.audit_anchor;
BEGIN
INSERT INTO audit.audit_anchor(stream, last_id, last_hash, note)
SELECT s.stream, s.last_id, s.last_hash, p_note FROM audit.audit_stream s WHERE s.stream = p_stream
RETURNING * INTO r;
RETURN r;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
GRANT SELECT, INSERT ON audit.audit_anchor TO audit_writer;
GRANT SELECT ON audit.audit_anchor TO audit_reader;
-- Приложению достаточно иметь роль audit_writer и вызывать функцию
SELECT (audit.append_event(
p_stream => 'tenant_42',
p_actor => 'user:834',
p_action => 'order.update_status',
p_payload => '{"order_id":12345, "from":"new", "to":"paid"}'
)).id;
-- Периодически (cron/pg_cron/структура задач) фиксируем якорь
SELECT audit.write_anchor('tenant_42', 'ежечасная фиксация');
pg_advisory_xact_lock по имени потока. Разные потоки пишут параллельно.payload хранит детали события. Не кладите туда килобайты текста — лучше ссылку на объект в хранилище.Периодически прогоняем проверку: пересчитываем хеши и сверяем с сохранёнными. Ниже минимальный скрипт на Python. Он считывает события по потоку в порядке id, повторяет формирование байтов как в функции и сравнивает.
# requirements: psycopg2-binary
import hashlib
import os
import sys
import psycopg2
DSN = os.getenv('PG_DSN', 'dbname=app user=app password=secret host=127.0.0.1')
STREAM = os.getenv('STREAM', 'tenant_42')
with psycopg2.connect(DSN) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id,
to_char(ts, 'YYYY-MM-DD"T"HH24:MI:SS.MS TZH:TZM') AS ts_text,
actor,
action,
payload::text AS payload_text,
encode(prev_hash,'hex') AS prev_hex,
encode(curr_hash,'hex') AS curr_hex
FROM audit.audit_log
WHERE stream = %s
ORDER BY id
""",
(STREAM,)
)
rows = cur.fetchall()
ok = True
for (id_, ts_text, actor, action, payload_text, prev_hex, curr_hex) in rows:
prev = bytes.fromhex(prev_hex)
data = prev + ts_text.encode('utf-8') + (actor or '').encode('utf-8') \
+ (action or '').encode('utf-8') + (payload_text or '{}').encode('utf-8')
got = hashlib.sha256(data).hexdigest()
if got != curr_hex:
print(f"BROKEN at id={id_}: expected {curr_hex}, got {got}")
ok = False
break
if ok:
print(f"OK: {len(rows)} events verified in stream '{STREAM}'")
Если у вас есть «якоря», можно ускорить проверку: начинать от последнего якоря вперёд, а не с генезиса.
payload — только суть. Большие вложения выносите в объектное хранилище.stream, ts обычно достаточно. Если нужны сложные выборки по payload — добавьте GIN, но следите за нагрузкой на запись.ts (месяц/неделя) — ускорит очистку и резервные копии.Пример партиционирования по месяцам:
ALTER TABLE audit.audit_log
PARTITION BY RANGE (ts);
-- Создаём партицию на март 2026
CREATE TABLE IF NOT EXISTS audit.audit_log_2026_03
PARTITION OF audit.audit_log
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
Подсказка: если вы уже создали базовую таблицу без партиций, можно аккуратно мигрировать (через временную таблицу и триггер переезда), но это отдельная операция планирования.
audit.audit_anchor. Это одно действие вставки, дешёвое по ресурсам.Минимальный якорь во внешнее хранилище — это пара (stream, last_id, last_hash_hex) с меткой времени и подписью ответственного.
# Пример выгрузки якоря в текстовый файл с контрольной суммой
psql "$PG_DSN" -Atc \
"SELECT stream, last_id, encode(last_hash,'hex') FROM audit.audit_stream WHERE stream='tenant_42'" \
| tee anchor_tenant_42_$(date +%F_%H%M%S).txt \
| sha256sum
EXECUTE на функцию append_event и INSERT в audit_anchor.SECURITY DEFINER, а сама схема «audit» доступна ограниченному сервисному пользователю.payload::text и jsonb (у которого порядок ключей детерминирован). Если вы меняете формат сериализации, не забудьте совместимость при верификации.audit.append_event(...).audit_writer у сервиса, audit_reader у аналитики/безопасности.audit.write_anchor(...) и якорь дублируется во внешнее хранилище.Итог: за несколько часов работы вы получаете журнал, который нельзя тихо переписать без следов. Это повышает доверие к данным, сокращает время расследований и снижает регуляторные риски — без внедрения дорогих внешних систем и без усложнения инфраструктуры.