README.md

TASk MANager. Очередь задач на go с использованием Postgresql

Назначение

Tasman - библиотека для очереди задач на PostgreSQL с исполнением задач в пуле воркеров Go. Задачи и их состояние хранятся в БД, получение новых задач основано на LISTEN/NOTIFY с fallback-поллингом. Постановка задач и наблюдение за состоянием возможны как через Go API/HTTP API, так и напрямую SQL-запросами к БД.

Возможности

  1. Регистрация типов задач в БД.
  2. Приоритеты, отложенный запуск, ограничение по числу попыток.
  3. Heartbeat и автоматическое освобождение зависших задач.
  4. Получение задач через LISTEN/NOTIFY или внешний listener.
  5. Пул воркеров и обработчики задач на Go.

Требования

  1. Go 1.24+.
  2. PostgreSQL 9.5+ (используются SKIP LOCKED, LISTEN/NOTIFY, JSONB).

Быстрый старт (код)

pool, err := pgxpool.New(ctx, dsn)
if err != nil {
    // обработка ошибки
}
defer pool.Close()

svc, err := tasman.New(tasman.Config{
    WorkerCount: 5,
}, pool)
if err != nil {
    // обработка ошибки
}

// Ваш обработчик бизнес-логики.
sampleHandler := func(ctx context.Context, payload json.RawMessage) (any, error) {
    var req struct {
        UserID string `json:"user_id"`
        Action string `json:"action"`
    }
    if err := json.Unmarshal(payload, &req); err != nil {
        return nil, err
    }
    // Здесь можно вызывать ваш сервис/репозиторий/внешний API.
    return map[string]any{
        "user_id": req.UserID,
        "action":  req.Action,
        "status":  "processed",
    }, nil
}

// Инициализация схемы БД из embedded SQL.
if err := svc.Init(ctx, staticFS); err != nil {
    // обработка ошибки
}

// Регистрация обработчика.
err = svc.RegisterHandler(ctx, "sample", sampleHandler)
if err != nil {
    // обработка ошибки
}

// Постановка задач.
payloads := []json.RawMessage{
    []byte(`{"user_id":"u-1","action":"create"}`),
    []byte(`{"user_id":"u-2","action":"delete"}`),
}
_, _ = svc.Enqueue(ctx, "sample", payloads, tasman.WithPriority(10))

// Запуск воркеров с общим контекстом и сбором ошибок.
g, runCtx := errgroup.WithContext(ctx)

// (Опционально) Подключение HTTP API.
// Без этого шага svc.SetupRoutes не используется и HTTP API не поднимается.
mux := http.NewServeMux()
svc.SetupRoutes(mux)
apiSrv := &http.Server{
    Addr:    ":8080",
    Handler: mux,
}
g.Go(func() error {
    if err := apiSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
        return err
    }
    return nil
})
g.Go(func() error {
    <-runCtx.Done()
    return apiSrv.Shutdown(context.Background())
})

for _, w := range svc.Workers() {
    workerFn := w
    g.Go(func() error {
        return workerFn(runCtx)
    })
}
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
    // обработка ошибки выполнения воркеров
}

Схема БД

  1. SQL-скрипты находятся в static/sql.
  2. По умолчанию svc.Init применяет embedded-скрипты.
  3. Можно использовать внешний каталог со скриптами (см. STATIC в примере) или отключить инициализацию через SkipInit.
  4. Подробная схема инициализации вынесена в pg/migrate`.

Архитектура сервиса

Ключевые потоки:

  1. HTTP API (опционально): SetupRoutes -> Service -> pg -> PostgreSQL.
  2. Worker runtime (обязательно для исполнения): Service.Workers() -> dequeue/heartbeat/complete.
  3. Direct SQL client (опционально): запись задач и чтение состояния напрямую в БД.
flowchart TB
    DB[(PostgreSQL)]
    SQL[static/sql/*.sql]

    subgraph Clients
        HTTP[HTTP client]
        SQLC[SQL client]
    end

    subgraph App["tasman application"]
        API[HTTP server + SetupRoutes]
        SVC[Service]
        W[Workers]
        M[pg/migrate]
        N[pg/notify]
        P[pg/producer]
        Q[pg/queue]
        O[pg/observer]
    end

    HTTP --> API
    API --> SVC
    SVC --> W

    SVC --> M
    SVC --> N
    SVC --> P
    SVC --> Q
    SVC --> O

    M --> SQL
    M --> DB
    N --> DB
    P --> DB
    Q --> DB
    O --> DB

    SQLC --> DB

Sequence Enqueue/Dequeue

sequenceDiagram
    participant C as Client (HTTP)
    participant CS as Client (SQL)
    participant H as API Handler
    participant S as Service
    participant P as pg.Producer
    participant DB as PostgreSQL
    participant W as Worker
    participant Q as pg.Queue

    C->>H: POST /api/v1/enqueue
    H->>S: Enqueue(taskType, payloads, opts)
    S->>P: Enqueue(...)
    P->>DB: SELECT task_create(...)
    DB-->>P: task ids
    P-->>S: []id
    S-->>H: EnqueueResponse
    H-->>C: 200 OK

    DB-->>S: NOTIFY task_type
    S->>Q: NextTask(taskTypes, available)
    Q->>DB: SELECT task_nextN(...)
    DB-->>Q: tasks
    Q-->>S: tasks
    S->>W: taskChan <- task
    W->>Q: HeartbeatStart(taskID, workerID)
    W->>Q: Heartbeat(...)
    W->>Q: CompleteTask(result/error)
    Q->>DB: SELECT complete_task(...)

    CS->>DB: INSERT INTO task / SELECT task_create(...)
    CS->>DB: SELECT ... FROM task_all / stats

Жизненный цикл задачи

stateDiagram-v2
    [*] --> pending
    pending --> starting: task_nextN()
    starting --> running: heartbeat_start()
    running --> done: complete_task(result)
    running --> failed: complete_task(error)
    failed --> pending: retry if attempts < max_attempts
    running --> pending: release_stale_tasks() / heartbeat timeout
    done --> [*]
    failed --> [*]: attempts >= max_attempts

HTTP API

HTTP API описан в api/api.proto и подключается через SetupRoutes.

Текущие маршруты:

  1. POST /api/v1/enqueue - добавить задачи.
  2. GET /api/v1/task/{id} - получить информацию по задаче.
  3. GET /api/v1/tasks - получить список задач (с фильтрацией по status и limit).
  4. GET /api/v1/stats - получить агрегированную статистику по статусам.
  5. GET /health - healthcheck сервиса.
  6. GET /metrics - метрики сервиса в text exposition формате.

Прямой SQL доступ

Клиент может работать с очередью напрямую через PostgreSQL.

Примеры:

-- Настроить search_path
SET search_path = tasman, public;

-- 1) Зарегистрировать тип задачи (или получить NULL, если уже есть)
SELECT task_type_create('sample');

-- 2) Добавить задачу через функцию
SELECT task_create('sample', '{"id":1}'::jsonb, 10, 3, now());

-- 3) Добавить задачу напрямую в таблицу
INSERT INTO task(task_type, payload) VALUES ('sample', '{"id":2}'::jsonb) RETURNING id;

-- 4) Смотреть текущее состояние очереди
SELECT id, task_type, status, attempts, scheduled_at, created_at
FROM task_all
ORDER BY created_at DESC
LIMIT 100;

-- 5) Сводка по статусам
SELECT status, COUNT(*) FROM task_all GROUP BY status ORDER BY status;

Кодогенерация API

Все команды и описание кодогенерации вынесены в api/README.md.

Пример приложения

Отдельный пример находится в example/README.md.

Документация

  1. Обзор проекта и API (этот файл).
  2. PostgreSQL-слой (queue/producer/observer).
  3. Инициализация схемы и применение SQL.
  4. Работа с LISTEN/NOTIFY.
  5. Запуск и настройки примера.
  6. Описание SQL-объектов.
  7. Описание HTTP API из api/api.proto.
  8. Кодогенерация API и документации.

Go API (godoc)

  1. Пакет tasman.
  2. Пакет pg/migrate.
  3. Пакет pg/notify.
  4. Пакет static.
  5. Сгенерированные API-типы/интерфейсы.
  6. Пакет примера.
  7. Пакет sample-задач.

См. также

Разработка прототипа

Первичный промпт был отправлен в несколько LLM, результат их работы размещен на ветках:

  • gpt-5.3-codex
  • sonnet-4.5
  • opus-4.6
  • gpt-oss:20B

Исходный код

git clone https://gitflic.ru/project/vinteo/tasman.git
cd tasman

Лицензия

Copyright 2026 Vinteo

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Описание
TASk MANager. Очередь задач на go с использованием Postgresql
Конвейеры
0 успешных
10 с ошибкой
Разработчики