TASk MANager. Очередь задач на go с использованием Postgresql
Назначение
Tasman - библиотека для очереди задач на PostgreSQL с исполнением задач в пуле воркеров Go. Задачи и их состояние хранятся в БД, получение новых задач основано на LISTEN/NOTIFY с fallback-поллингом. Постановка задач и наблюдение за состоянием возможны как через Go API/HTTP API, так и напрямую SQL-запросами к БД.
Возможности
- Регистрация типов задач в БД.
- Приоритеты, отложенный запуск, ограничение по числу попыток.
- Heartbeat и автоматическое освобождение зависших задач.
- Получение задач через
LISTEN/NOTIFYили внешний listener. - Пул воркеров и обработчики задач на Go.
Требования
- Go 1.24+.
- PostgreSQL 9.5+ (используются
SKIP LOCKED,LISTEN/NOTIFY,JSONB).
Быстрый старт (код)
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
// обработка ошибки
}
defer pool.Close()
svc, err := tasman.New(tasman.Config{
WorkerCount: 5,
}, pool)
if err != nil {
// обработка ошибки
}
// Ваш обработчик бизнес-логики.
sampleHandler := func(ctx context.Context, payload json.RawMessage) (any, error) {
var req struct {
UserID string `json:"user_id"`
Action string `json:"action"`
}
if err := json.Unmarshal(payload, &req); err != nil {
return nil, err
}
// Здесь можно вызывать ваш сервис/репозиторий/внешний API.
return map[string]any{
"user_id": req.UserID,
"action": req.Action,
"status": "processed",
}, nil
}
// Инициализация схемы БД из embedded SQL.
if err := svc.Init(ctx, staticFS); err != nil {
// обработка ошибки
}
// Регистрация обработчика.
err = svc.RegisterHandler(ctx, "sample", sampleHandler)
if err != nil {
// обработка ошибки
}
// Постановка задач.
payloads := []json.RawMessage{
[]byte(`{"user_id":"u-1","action":"create"}`),
[]byte(`{"user_id":"u-2","action":"delete"}`),
}
_, _ = svc.Enqueue(ctx, "sample", payloads, tasman.WithPriority(10))
// Запуск воркеров с общим контекстом и сбором ошибок.
g, runCtx := errgroup.WithContext(ctx)
// (Опционально) Подключение HTTP API.
// Без этого шага svc.SetupRoutes не используется и HTTP API не поднимается.
mux := http.NewServeMux()
svc.SetupRoutes(mux)
apiSrv := &http.Server{
Addr: ":8080",
Handler: mux,
}
g.Go(func() error {
if err := apiSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})
g.Go(func() error {
<-runCtx.Done()
return apiSrv.Shutdown(context.Background())
})
for _, w := range svc.Workers() {
workerFn := w
g.Go(func() error {
return workerFn(runCtx)
})
}
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
// обработка ошибки выполнения воркеров
}
Схема БД
- SQL-скрипты находятся в
static/sql. - По умолчанию
svc.Initприменяет embedded-скрипты. - Можно использовать внешний каталог со скриптами (см.
STATICв примере) или отключить инициализацию черезSkipInit. - Подробная схема инициализации вынесена в pg/migrate`.
Архитектура сервиса
Ключевые потоки:
HTTP API(опционально):SetupRoutes->Service->pg-> PostgreSQL.Worker runtime(обязательно для исполнения):Service.Workers()-> dequeue/heartbeat/complete.Direct SQL client(опционально): запись задач и чтение состояния напрямую в БД.
flowchart TB
DB[(PostgreSQL)]
SQL[static/sql/*.sql]
subgraph Clients
HTTP[HTTP client]
SQLC[SQL client]
end
subgraph App["tasman application"]
API[HTTP server + SetupRoutes]
SVC[Service]
W[Workers]
M[pg/migrate]
N[pg/notify]
P[pg/producer]
Q[pg/queue]
O[pg/observer]
end
HTTP --> API
API --> SVC
SVC --> W
SVC --> M
SVC --> N
SVC --> P
SVC --> Q
SVC --> O
M --> SQL
M --> DB
N --> DB
P --> DB
Q --> DB
O --> DB
SQLC --> DB
Sequence Enqueue/Dequeue
sequenceDiagram
participant C as Client (HTTP)
participant CS as Client (SQL)
participant H as API Handler
participant S as Service
participant P as pg.Producer
participant DB as PostgreSQL
participant W as Worker
participant Q as pg.Queue
C->>H: POST /api/v1/enqueue
H->>S: Enqueue(taskType, payloads, opts)
S->>P: Enqueue(...)
P->>DB: SELECT task_create(...)
DB-->>P: task ids
P-->>S: []id
S-->>H: EnqueueResponse
H-->>C: 200 OK
DB-->>S: NOTIFY task_type
S->>Q: NextTask(taskTypes, available)
Q->>DB: SELECT task_nextN(...)
DB-->>Q: tasks
Q-->>S: tasks
S->>W: taskChan <- task
W->>Q: HeartbeatStart(taskID, workerID)
W->>Q: Heartbeat(...)
W->>Q: CompleteTask(result/error)
Q->>DB: SELECT complete_task(...)
CS->>DB: INSERT INTO task / SELECT task_create(...)
CS->>DB: SELECT ... FROM task_all / stats
Жизненный цикл задачи
stateDiagram-v2
[*] --> pending
pending --> starting: task_nextN()
starting --> running: heartbeat_start()
running --> done: complete_task(result)
running --> failed: complete_task(error)
failed --> pending: retry if attempts < max_attempts
running --> pending: release_stale_tasks() / heartbeat timeout
done --> [*]
failed --> [*]: attempts >= max_attempts
HTTP API
HTTP API описан в api/api.proto и подключается через SetupRoutes.
Текущие маршруты:
POST /api/v1/enqueue- добавить задачи.GET /api/v1/task/{id}- получить информацию по задаче.GET /api/v1/tasks- получить список задач (с фильтрацией поstatusиlimit).GET /api/v1/stats- получить агрегированную статистику по статусам.GET /health- healthcheck сервиса.GET /metrics- метрики сервиса в text exposition формате.
Прямой SQL доступ
Клиент может работать с очередью напрямую через PostgreSQL.
Примеры:
-- Настроить search_path
SET search_path = tasman, public;
-- 1) Зарегистрировать тип задачи (или получить NULL, если уже есть)
SELECT task_type_create('sample');
-- 2) Добавить задачу через функцию
SELECT task_create('sample', '{"id":1}'::jsonb, 10, 3, now());
-- 3) Добавить задачу напрямую в таблицу
INSERT INTO task(task_type, payload) VALUES ('sample', '{"id":2}'::jsonb) RETURNING id;
-- 4) Смотреть текущее состояние очереди
SELECT id, task_type, status, attempts, scheduled_at, created_at
FROM task_all
ORDER BY created_at DESC
LIMIT 100;
-- 5) Сводка по статусам
SELECT status, COUNT(*) FROM task_all GROUP BY status ORDER BY status;
Кодогенерация API
Все команды и описание кодогенерации вынесены в api/README.md.
Пример приложения
Отдельный пример находится в example/README.md.
Документация
- Обзор проекта и API (этот файл).
- PostgreSQL-слой (queue/producer/observer).
- Инициализация схемы и применение SQL.
- Работа с
LISTEN/NOTIFY. - Запуск и настройки примера.
- Описание SQL-объектов.
- Описание HTTP API из api/api.proto.
- Кодогенерация API и документации.
Go API (godoc)
- Пакет
tasman. - Пакет
pg/migrate. - Пакет
pg/notify. - Пакет
static. - Сгенерированные API-типы/интерфейсы.
- Пакет примера.
- Пакет sample-задач.
См. также
- Pond: Submitting tasks associated with a context
- Очередь задач на Postgres: SKIP LOCKED + lease/heartbeat + backpressure
- How We Replaced Goroutines With a Database Queue
Разработка прототипа
Первичный промпт был отправлен в несколько LLM, результат их работы размещен на ветках:
- gpt-5.3-codex
- sonnet-4.5
- opus-4.6
- gpt-oss:20B
Исходный код
git clone https://gitflic.ru/project/vinteo/tasman.git
cd tasman
Лицензия
Copyright 2026 Vinteo
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.