Простой, но мощный планировщик и панель мониторинга DAG, написанные на Go. На основе goflow - репозиторий.
Используйте его, если:
- вам нужен планировщик направленного ациклического графа (DAG), такой как Apache Airflow, но без каких-либо сложностей.
- у вас есть множество кластеров или сервисов, выполняющих тяжелые вычисления, и вам нужно что-то маленькое и легкое для их оркестрации.
- вам нужна панель мониторинга.
- вам нужно максимально простое развертывание с помощью одного двоичного файла или контейнера, что сэкономит ваше время. Монтирование томов и т. д. — слишком большая головная боль.
- вы хотите, чтобы он работал на одной крошечной виртуальной машине, что позволяет сэкономить на облачных расходах.
- вы предпочитаете определять группы обеспечения доступности баз данных с помощью кода, а не файлов конфигурации. Этот подход может иметь различные преимущества, включая более простое тестирование.
Не используйте его, если:
- вам нужно поставить в очередь огромное количество задач на одном узле.
Создайте файл main.go
с содержимым:
package main
import (
"log"
"os"
"stendml/goflow"
)
var options goflow.Options
func main() {
dir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
options = goflow.Options{
UIPath: "ui/",
Streaming: true,
ShowExamples: false,
Public_dir: dir, // в этой директории должна находится папка public/, которая будет монтироваться в докер и станет доступна по http:// по ссылке / директория public должна существоать в $dir
}
gf := goflow.New(options)
gf.AddJob(JobDockerBuild)
gf.AddJob(JobDockerRun)
gf.AddJob(JobTengo)
gf.AddJob(JobCommands)
gf.AddJob(JobGets)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
Запустить приложение можно с помощью go run .
и просмотрите его в браузере по адресу localhost:8181. Удаленно можно запустить и используя утилиту типа gotty или мультиплексор(Screen или Tmux), полезно использовать что-то типа smug.
Обзор разработки
Сначала несколько определений.
- «Job»: рабочий процесс Goflow называется «Задание». Задания можно планировать с использованием синтаксиса cron.
- «Task»: задача каждое задание состоит из одной или нескольких задач, организованных в виде графа зависимостей. Задача может быть запущена при определенных условиях; по умолчанию задача запускается, когда все ее зависимости завершаются успешно.
- Параллелизм: задания и задачи выполняются одновременно.
- «Operator»: «Оператор» определяет работу, выполняемую «Задачей». Goflow поставляется с несколькими базовыми операторами, и реализовать собственный «Оператор» очень просто.
- Reries: Вполнять задачу «Task» заданное количество попыток. Предусмотрены две стратегии повтора: ConstantDelay и ExponentialBackoff.
- Sreaming: используются события, отправляемые сервером, для потоковой передачи статуса заданий и задач на панель управления в режиме реального времени.
Задания(Job) и задачи(Task)
Начнем с создания функции, возвращающей задание под названием myJob. В этом задании есть одна задача, которая приостанавливается на одну секунду.
package main
import (
"errors"
"stend/goflow"
)
func myJob() *goflow.Job {
j := &goflow.Job{Name: "myJob", Schedule: "* * * * *", Active: true}
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
})
return j
}
Установив Active: true
, мы сообщаем Goflow применить предоставленное расписание cron для этого задания при запуске приложения. Планирование заданий можно активировать и деактивировать с панели управления.
Пользовательские операторы
Пользовательский «Operator» должен реализовать метод «Run». Вот пример оператора, который складывает два положительных числа.
type PositiveAddition struct{ a, b int }
func (o PositiveAddition) Run() (interface{}, error) {
if o.a < 0 || o.b < 0 {
return 0, errors.New("Can't add negative numbers")
}
result := o.a + o.b
return result, nil
}
Повторение
Let’s add a retry strategy to the sleepForOneSecond
task:
func myJob() *goflow.Job {
j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
})
return j
}
Вместо ConstantDelay мы также могли бы использовать ExponentialBackoff. см. Вики.
Зависимости задач
Задание может использовать направленный ациклический граф (DAG) независимых и зависимых задач. Можно воспользуется методом SetDownstream
, чтобы определите две задачи, которые зависят от SleepForOneSecond
. Задачи будут использовать оператор PositiveAddition, который мы определили ранее. а также новый оператор Goflow — Get
.
func myJob() *goflow.Job {
j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
})
j.Add(&goflow.Task{
Name: "getGoogle",
Operator: goflow.Get{Client: &http.Client{}, URL: "https://www.google.com"},
})
j.Add(&goflow.Task{
Name: "AddTwoPlusThree",
Operator: PositiveAddition{a: 2, b: 3},
})
j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("getGoogle"))
j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("AddTwoPlusThree"))
return j
}
Задание myJob нужно вставить в основной файл :
package main
import (
"log"
"os"
"stendml/goflow"
)
var options goflow.Options
func main() {
dir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
options = goflow.Options{
UIPath: "ui/",
Streaming: true,
ShowExamples: false,
Public_dir: dir, // в этой директории должна находится папка public/, которая будет монтироваться в докер и станет доступна по http:// по ссылке / директория public должна существоать в $dir }
gf := goflow.New(options)
gf.AddJob(JobDockerBuild)
gf.AddJob(JobDockerRun)
gf.AddJob(JobTengo)
gf.AddJob(JobCommands)
gf.AddJob(JobGets)
gf.AddJob(myJob) // <---- вот
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
Для докеров DAG задается в файле docker.graph представляющий собой тектовый файл со списком ребер: на одной строке одно ребро, разделитель “;”. Например:
t1;t2
Результат:
Пример:
from sklearn.cluster import DBSCAN
from sklearn.metrics import ConfusionMatrixDisplay
import numpy as np
from sklearn.metrics import confusion_matrix
X = np.array([[1, 2], [2, 2], [2, 3],
[8, 7], [8, 8], [20,20],[25, 80], [25, 80],[25, 80]])
clustering = DBSCAN(eps=3, min_samples=2).fit(X)
y_true = [0, 0, 0, 1, 1, 1,2,2,2]
print(clustering.labels_)
y_pred = clustering.labels_
print(confusion_matrix(y_true, y_pred))
IC = type('IdentityClassifier', (), {"predict": lambda i : i, "_estimator_type": "classifier"})
cm=ConfusionMatrixDisplay.from_estimator(IC, y_pred, y_true, normalize='true', values_format='.2%')
cm.figure_.savefig('/public/confusion_matrix.png')
При старте докеров директория public в хосте монтруется в директорию /public в докере.
Правила триггеров
По умолчанию задача имеет триггерное правило allSuccessful
, что означает, что задача начинает выполняться, когда все задачи напрямую восходящий выход успешно завершен. Если какая-либо зависимость завершается с ошибкой, все последующие задачи пропускаются, и задание завершается с ошибкой.
Иногда вам нужно, чтобы нижестоящая задача выполнялась даже при наличии сбоев в восходящем направлении. Часто это ситуации, когда хочется для выполнения какой-либо задачи по очистке, например выключения сервера. В таких случаях вы можете назначить задаче триггерное правило allDone
.
Можно изменить SleepForOneSecond, чтобы оно имело триггерное правило allDone
.
func myJob() *goflow.Job {
// other stuff
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
TriggerRule: "allDone",
})
// other stuff
}
Старт приложение с заданиями
Наконец, давайте создадим приложение, зарегистрируем наше задание, подключим регистратор и запустим приложение.
func main() {
gf := goflow.New(goflow.Options{Streaming: true})
gf.AddJob(myJob)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
Вы можете передавать различные параметры движку. Варианты, поддерживаемые в настоящее время:
Storage
: хранилище более подробно описано ниже.UIPath
: путь к коду информационной панели. Значением по умолчанию является пустая строка, что означает, что Goflow обслуживает только API, а не панель мониторинга. Рекомендуемое значение, если вам нужна панель мониторинга:ui/
Streaming
: следует ли транслировать обновления на панель управления. Значение по умолчанию — false, но если вы используете панель мониторинга, рекомендуется изменить это значение.ShowExamples
: показывать ли примеры заданий. Значение по умолчанию:false
Приложение использует фреймворк Gin, поэтому вы можете передать любой обработчик Gin в Use
.
Стандартные операторы
Goflow предоставляет несколько операторов для общих задач. Примеры в jobs.go
Command
выполняет команду оболочки.Get
выполняет запрос GET.Post
выполняет POST-запрос.Tengo
исполняемые скрипты репозиторийDockers
Dag основанный на докерах
Хранилища
По умолчанию используетcя база данных в памяти Для сохранения истории выполнения заданий планируется разработать механизм подключения к базам данных «ключ-значение».
API и интеграция
Вы можете использовать API для интеграции Goflow с другими приложениями, например с существующей информационной панелью. Вот обзор доступных конечных точек:
GET /api/health
: проверьте работоспособность службы.GET /api/jobs
: список зарегистрированных заданий.GET /api/jobs/{jobname}
: получить подробную информацию о задании.GET /api/jobruns
: Запрос и список запусков заданий.POST /api/jobs/{jobname}/submit
: отправить задание на выполнение.POST /api/jobs/{jobname}/toggle
: включение или выключение расписания заданий./stream
: эта конечная точка возвращает события, отправленные сервером, с полезной нагрузкойdata
, соответствующей той, которая возвращается/api/jobruns
.
Статистику потребления ресурсов машины можно посмотреть: http://localhost:8181/debug/statsviz/
Описание
Типовой стенд для проверки моделей машинного обучения