
Отправка писем, пересчёт агрегатов, вебхуки, генерация отчетов — для всего этого обычно тянут отдельный брокер (RabbitMQ, Kafka, SQS). Но во многих продуктах это усложняет инфраструктуру и увеличивает время вывода фич. Если у вас:
то очередь на PostgreSQL — отличный компромисс. Плюсы для бизнеса:
Ниже — полноценная схема и рабочий код. Подход даёт «по крайней мере один раз» доставку (at-least-once). Поэтому обработчики должны быть идемпотентными: повторный запуск не должен ломать данные.
Создадим таблицу задач, индексы и удобные функции — вставка с дедупликацией и retry‑delay.
-- Схема очереди задач
create table if not exists job (
id bigserial primary key,
queue text not null, -- имя очереди/темы
priority int not null default 100, -- ниже число — выше приоритет
status text not null default 'queued' check (status in ('queued','running','done','failed','cancelled')),
run_at timestamptz not null default now(), -- когда можно брать в работу
attempts int not null default 0, -- сколько раз пытались
max_attempts int not null default 20,
unique_key text, -- для дедупликации (по желанию)
last_error text,
payload jsonb not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
-- Быстрый выбор очереди к обработке
create index if not exists idx_job_queued on job (queue, priority, run_at, id) where status = 'queued';
-- Дедупликация: активная задача с таким ключом может быть только одна
create unique index if not exists uniq_job_active_key on job (unique_key) where unique_key is not null and status in ('queued','running');
-- Триггер на updated_at
create or replace function set_updated_at() returns trigger as $$
begin
new.updated_at = now();
return new;
end; $$ language plpgsql;
drop trigger if exists trg_job_updated_at on job;
create trigger trg_job_updated_at before update on job
for each row execute function set_updated_at();
-- Экспоненциальная пауза между ретраями (не более часа)
create or replace function retry_delay(p_attempts int) returns interval as $$
declare
-- с 1-й попытки: 10с, 2-я: 20с, 3-я: 40с ... до 1 часа
secs int := least(3600, (power(2, greatest(p_attempts, 1))::int) * 10);
begin
return make_interval(secs => secs);
end; $$ language plpgsql immutable;
-- Удобная функция постановки задачи с опциональной дедупликацией и уведомлением воркеров
create or replace function enqueue_job(
p_queue text,
p_payload jsonb,
p_priority int default 100,
p_run_at timestamptz default now(),
p_unique_key text default null,
p_max_attempts int default 20
) returns bigint as $$
declare
v_id bigint;
begin
if p_unique_key is not null then
insert into job(queue, payload, priority, run_at, unique_key, max_attempts)
values (p_queue, p_payload, p_priority, p_run_at, p_unique_key, p_max_attempts)
on conflict (unique_key) do update
set payload = excluded.payload,
priority = excluded.priority,
run_at = excluded.run_at,
max_attempts = excluded.max_attempts
returning id into v_id;
else
insert into job(queue, payload, priority, run_at, max_attempts)
values (p_queue, p_payload, p_priority, p_run_at, p_max_attempts)
returning id into v_id;
end if;
perform pg_notify('queue_' || p_queue, v_id::text);
return v_id;
end; $$ language plpgsql;
Почему так:
Чтобы десятки воркеров не схватили одну и ту же задачу, используем одновременное блокирующее чтение и пропуск уже занятых записей:
-- Забрать пачку задач в работу атомарно
with cte as (
select id
from job
where status = 'queued' and queue = $1 and run_at <= now()
order by priority asc, run_at asc, id asc
for update skip locked
limit $2
)
update job j
set status = 'running', attempts = attempts + 1, updated_at = now()
from cte
where j.id = cte.id
returning j.id, j.payload, j.attempts, j.max_attempts;
Если обработчик вернул ошибку, возвращаем задачу в очередь с паузой. Когда попыток стало слишком много — ставим статус failed.
-- Успешно
update job set status = 'done' where id = $1;
-- Ошибка: отложить или пометить как failed
update job
set status = case when attempts >= max_attempts then 'failed' else 'queued' end,
run_at = case when attempts >= max_attempts then run_at else now() + retry_delay(attempts) end,
last_error = $2
where id = $1;
Важно: attempts мы увеличили при взятии в работу. Значит, retry_delay(attempts) уже учитывает текущую попытку.
Частый кейс: одна и та же логическая задача может быть создана много раз (например, система дергает пересчёт сегмента). Чтобы не перемалывать одинаковое, используйте unique_key. Функция enqueue_job делает UPSERT — либо создаёт новую запись, либо обновляет существующую queued/running. Это особенно полезно для «последнего состояния» (например, «пересчитать до версии N»).
Постоянный поллинг раз в 100–500 мс — нормальная стратегия, но можно снизить лишние запросы. После вставки делаем NOTIFY, а воркеры делают LISTEN и просыпаются мгновенно.
Канал формируем как queue_<имя_очереди>, чтобы у каждого типа задач был свой сигнал.
Ниже — минимальный, но рабочий пример. Он:
Соберите переменную окружения DATABASE_URL (например, postgres://user
@localhost/app?sslmode=disable).package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Job struct {
ID int64
Payload map[string]any
Attempts int32
MaxAttempts int32
}
func main() {
dsn := os.Getenv("DATABASE_URL")
if dsn == "" {
log.Fatal("DATABASE_URL is not set")
}
queue := getenv("QUEUE", "email")
batchSize := getenvInt("BATCH_SIZE", 10)
pollEvery := getenvDuration("POLL_MS", 200*time.Millisecond)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
log.Fatalf("pgxpool: %v", err)
}
defer pool.Close()
// Отдельное соединение для LISTEN
listener, err := pgx.Connect(ctx, dsn)
if err != nil {
log.Fatalf("listener connect: %v", err)
}
defer listener.Close(ctx)
channel := "queue_" + queue
if _, err := listener.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize()); err != nil {
log.Fatalf("listen: %v", err)
}
log.Printf("listening on channel=%s queue=%s", channel, queue)
for {
if ctx.Err() != nil {
break
}
jobs, err := fetchAndLockJobs(ctx, pool, queue, batchSize)
if err != nil {
log.Printf("fetch: %v", err)
// Небольшая пауза, чтобы не крутить цикл при ошибке подключения
time.Sleep(500 * time.Millisecond)
continue
}
if len(jobs) > 0 {
for _, j := range jobs {
if err := handleJob(ctx, pool, j); err != nil {
log.Printf("job %d error: %v", j.ID, err)
}
}
continue
}
// Если задач нет — ждём уведомление или таймаут поллинга
waitCtx, cancel := context.WithTimeout(ctx, pollEvery)
_, err = listener.WaitForNotification(waitCtx)
cancel()
// По таймауту просто продолжим цикл (поллинг), по ошибке контекста — тоже
}
log.Println("graceful shutdown")
}
func fetchAndLockJobs(ctx context.Context, pool *pgxpool.Pool, queue string, limit int) ([]Job, error) {
const q = `
with cte as (
select id
from job
where status = 'queued' and queue = $1 and run_at <= now()
order by priority asc, run_at asc, id asc
for update skip locked
limit $2
)
update job j
set status = 'running', attempts = attempts + 1, updated_at = now()
from cte
where j.id = cte.id
returning j.id, j.payload, j.attempts, j.max_attempts;
`
rows, err := pool.Query(ctx, q, queue, limit)
if err != nil {
return nil, err
}
defer rows.Close()
jobs := make([]Job, 0, limit)
for rows.Next() {
var (
id int64
payloadBytes []byte
attempts int32
maxAttempts int32
)
if err := rows.Scan(&id, &payloadBytes, &attempts, &maxAttempts); err != nil {
return nil, err
}
var payload map[string]any
if err := json.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
jobs = append(jobs, Job{ID: id, Payload: payload, Attempts: attempts, MaxAttempts: maxAttempts})
}
return jobs, rows.Err()
}
func handleJob(ctx context.Context, pool *pgxpool.Pool, j Job) error {
// Пример обработчика: имитируем работу 100–300 мс
dur := 100*time.Millisecond + time.Duration(j.ID%200)*time.Millisecond
time.Sleep(dur)
// Можно управлять ошибками через payload, например {"force_error": true}
if b, ok := j.Payload["force_error"].(bool); ok && b {
return failJob(ctx, pool, j.ID, errors.New("forced error"))
}
return completeJob(ctx, pool, j.ID)
}
func completeJob(ctx context.Context, pool *pgxpool.Pool, id int64) error {
cmd, err := pool.Exec(ctx, "update job set status='done' where id=$1", id)
if err != nil {
return err
}
if cmd.RowsAffected() == 0 {
return fmt.Errorf("job %d not found", id)
}
return nil
}
func failJob(ctx context.Context, pool *pgxpool.Pool, id int64, cause error) error {
msg := cause.Error()
const q = `
update job
set status = case when attempts >= max_attempts then 'failed' else 'queued' end,
run_at = case when attempts >= max_attempts then run_at else now() + retry_delay(attempts) end,
last_error = $2,
updated_at = now()
where id = $1;
`
_, err := pool.Exec(ctx, q, id, msg)
return err
}
func getenv(key, def string) string {
v := os.Getenv(key)
if v == "" { return def }
return v
}
func getenvInt(key string, def int) int {
v := os.Getenv(key)
if v == "" { return def }
var x int
_, err := fmt.Sscanf(v, "%d", &x)
if err != nil { return def }
return x
}
func getenvDuration(key string, def time.Duration) time.Duration {
v := os.Getenv(key)
if v == "" { return def }
d, err := time.ParseDuration(v+"ms")
if err != nil { return def }
return d
}
Замечания к коду:
Что стоит мерить сразу:
Простые алерты:
Когда стоит задуматься о миграции на специализированный брокер:
Мигрировать проще, если с первого дня держать обработчики идемпотентными и описать контракт задачи (тип, схема payload). Тогда перенос «транспортного слоя» (Postgres → брокер) будет наименее болезненный.
Очередь на PostgreSQL с FOR UPDATE SKIP LOCKED — практичный способ быстро запустить фоновые задачи без новой инфраструктуры. Вы получаете транзакционную постановку, конкурентных воркеров без дедлоков, ретраи с экспоненциальной паузой, дедупликацию и мгновенные уведомления через LISTEN/NOTIFY. Это закрывает 80% повседневных сценариев и снижает расходы. При росте нагрузки и требований вы сможете эволюционировать к брокеру — без переписывания всей бизнес‑логики.