Readme.md

Простой, но мощный планировщик и панель мониторинга DAG, написанные на Go. На основе goflow - репозиторий.

Используйте его, если:

  • вам нужен планировщик направленного ациклического графа (DAG), такой как Apache Airflow, но без каких-либо сложностей.
  • у вас есть множество кластеров или сервисов, выполняющих тяжелые вычисления, и вам нужно что-то маленькое и легкое для их оркестрации.
  • вам нужна панель мониторинга.
  • вам нужно максимально простое развертывание с помощью одного двоичного файла или контейнера, что сэкономит ваше время. Монтирование томов и т. д. — слишком большая головная боль.
  • вы хотите, чтобы он работал на одной крошечной виртуальной машине, что позволяет сэкономить на облачных расходах.
  • вы предпочитаете определять группы обеспечения доступности баз данных с помощью кода, а не файлов конфигурации. Этот подход может иметь различные преимущества, включая более простое тестирование.

Не используйте его, если:

  • вам нужно поставить в очередь огромное количество задач на одном узле.

Создайте файл main.go с содержимым:

package main

import (
	"log"
	"os"
	"stendml/goflow"
)

var options goflow.Options

func main() {
	dir, err := os.Getwd()
	if err != nil {
		log.Fatal(err)
	}
	options = goflow.Options{
		UIPath:       "ui/",
		Streaming:    true,
		ShowExamples: false,
		Public_dir:   dir, // в этой директории должна находится папка public/, которая будет монтироваться в докер и станет  доступна по http://   по  ссылке  / директория  public  должна  существоать в $dir
	}
	gf := goflow.New(options)
	gf.AddJob(JobDockerBuild)
	gf.AddJob(JobDockerRun)
	gf.AddJob(JobTengo)
	gf.AddJob(JobCommands)
	gf.AddJob(JobGets)
	gf.Use(goflow.DefaultLogger())
	gf.Run(":8181")
}


Запустить приложение можно с помощью go run . и просмотрите его в браузере по адресу localhost:8181. Удаленно можно запустить и используя утилиту типа gotty или мультиплексор(Screen или Tmux), полезно использовать что-то типа smug.

Обзор разработки

Сначала несколько определений.

  • «Job»: рабочий процесс Goflow называется «Задание». Задания можно планировать с использованием синтаксиса cron.
  • «Task»: задача каждое задание состоит из одной или нескольких задач, организованных в виде графа зависимостей. Задача может быть запущена при определенных условиях; по умолчанию задача запускается, когда все ее зависимости завершаются успешно.
  • Параллелизм: задания и задачи выполняются одновременно.
  • «Operator»: «Оператор» определяет работу, выполняемую «Задачей». Goflow поставляется с несколькими базовыми операторами, и реализовать собственный «Оператор» очень просто.
  • Reries: Вполнять задачу «Task» заданное количество попыток. Предусмотрены две стратегии повтора: ConstantDelay и ExponentialBackoff.
  • Sreaming: используются события, отправляемые сервером, для потоковой передачи статуса заданий и задач на панель управления в режиме реального времени.

Задания(Job) и задачи(Task)

Начнем с создания функции, возвращающей задание под названием myJob. В этом задании есть одна задача, которая приостанавливается на одну секунду.

package main

import (
	"errors"

	"stend/goflow"
)

func myJob() *goflow.Job {
	j := &goflow.Job{Name: "myJob", Schedule: "* * * * *", Active: true}
	j.Add(&goflow.Task{
		Name: "sleepForOneSecond",
		Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
	})
	return j
}

Установив Active: true, мы сообщаем Goflow применить предоставленное расписание cron для этого задания при запуске приложения. Планирование заданий можно активировать и деактивировать с панели управления.

Пользовательские операторы

Пользовательский «Operator» должен реализовать метод «Run». Вот пример оператора, который складывает два положительных числа.

type PositiveAddition struct{ a, b int }

func (o PositiveAddition) Run() (interface{}, error) {
	if o.a < 0 || o.b < 0 {
		return 0, errors.New("Can't add negative numbers")
	}
	result := o.a + o.b
	return result, nil
}

Повторение

Let’s add a retry strategy to the sleepForOneSecond task:

func myJob() *goflow.Job {
	j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
	j.Add(&goflow.Task{
		Name: "sleepForOneSecond",
		Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
		Retries: 5,
		RetryDelay: goflow.ConstantDelay{Period: 1},
	})
	return j
}

Вместо ConstantDelay мы также могли бы использовать ExponentialBackoff. см. Вики.

Зависимости задач

Задание может использовать направленный ациклический граф (DAG) независимых и зависимых задач. Можно воспользуется методом SetDownstream, чтобы определите две задачи, которые зависят от SleepForOneSecond. Задачи будут использовать оператор PositiveAddition, который мы определили ранее. а также новый оператор Goflow — Get.

func myJob() *goflow.Job {
	j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
	j.Add(&goflow.Task{
		Name: "sleepForOneSecond",
		Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
		Retries: 5,
		RetryDelay: goflow.ConstantDelay{Period: 1},
	})
	j.Add(&goflow.Task{
		Name: "getGoogle",
		Operator: goflow.Get{Client: &http.Client{}, URL: "https://www.google.com"},
	})
	j.Add(&goflow.Task{
		Name: "AddTwoPlusThree",
		Operator: PositiveAddition{a: 2, b: 3},
	})
	j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("getGoogle"))
	j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("AddTwoPlusThree"))
	return j
}

Задание myJob нужно вставить в основной файл :

package main

import (
	"log"
	"os"
	"stendml/goflow"
)

var options goflow.Options

func main() {
	dir, err := os.Getwd()
	if err != nil {
		log.Fatal(err)
	}
	options = goflow.Options{
		UIPath:       "ui/",
		Streaming:    true,
		ShowExamples: false,
		Public_dir:   dir, // в этой директории должна находится папка public/, которая будет монтироваться в докер и станет  доступна по http://   по  ссылке  / директория  public  должна  существоать в $dir	}
	gf := goflow.New(options)
	gf.AddJob(JobDockerBuild)
	gf.AddJob(JobDockerRun)
	gf.AddJob(JobTengo)
	gf.AddJob(JobCommands)
	gf.AddJob(JobGets) 
	gf.AddJob(myJob)  //  <----  вот 
	gf.Use(goflow.DefaultLogger())
	gf.Run(":8181")
}

Для докеров DAG задается в файле docker.graph представляющий собой тектовый файл со списком ребер: на одной строке одно ребро, разделитель “;”. Например:

t1;t2

Результат:

alt text

Пример:

from sklearn.cluster import DBSCAN
from sklearn.metrics import ConfusionMatrixDisplay

import numpy as np
from sklearn.metrics import confusion_matrix
X = np.array([[1, 2], [2, 2], [2, 3],
              [8, 7], [8, 8], [20,20],[25, 80], [25, 80],[25, 80]])
clustering = DBSCAN(eps=3, min_samples=2).fit(X)
y_true = [0, 0, 0, 1, 1, 1,2,2,2]


print(clustering.labels_)


y_pred = clustering.labels_
print(confusion_matrix(y_true, y_pred))




IC = type('IdentityClassifier', (), {"predict": lambda i : i, "_estimator_type": "classifier"})

cm=ConfusionMatrixDisplay.from_estimator(IC, y_pred, y_true, normalize='true',  values_format='.2%')

cm.figure_.savefig('/public/confusion_matrix.png')

При старте докеров директория public в хосте монтруется в директорию /public в докере.

Правила триггеров

По умолчанию задача имеет триггерное правило allSuccessful, что означает, что задача начинает выполняться, когда все задачи напрямую восходящий выход успешно завершен. Если какая-либо зависимость завершается с ошибкой, все последующие задачи пропускаются, и задание завершается с ошибкой.

Иногда вам нужно, чтобы нижестоящая задача выполнялась даже при наличии сбоев в восходящем направлении. Часто это ситуации, когда хочется для выполнения какой-либо задачи по очистке, например выключения сервера. В таких случаях вы можете назначить задаче триггерное правило allDone.

Можно изменить SleepForOneSecond, чтобы оно имело триггерное правило allDone.

func myJob() *goflow.Job {
	// other stuff
	j.Add(&goflow.Task{
		Name: "sleepForOneSecond",
		Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
		Retries: 5,
		RetryDelay: goflow.ConstantDelay{Period: 1},
		TriggerRule: "allDone",
	})
	// other stuff
}

Старт приложение с заданиями

Наконец, давайте создадим приложение, зарегистрируем наше задание, подключим регистратор и запустим приложение.

func main() {
	gf := goflow.New(goflow.Options{Streaming: true})
	gf.AddJob(myJob)
	gf.Use(goflow.DefaultLogger())
	gf.Run(":8181")
}

Вы можете передавать различные параметры движку. Варианты, поддерживаемые в настоящее время:

  • Storage: хранилище более подробно описано ниже.
  • UIPath: путь к коду информационной панели. Значением по умолчанию является пустая строка, что означает, что Goflow обслуживает только API, а не панель мониторинга. Рекомендуемое значение, если вам нужна панель мониторинга: ui/
  • Streaming: следует ли транслировать обновления на панель управления. Значение по умолчанию — false, но если вы используете панель мониторинга, рекомендуется изменить это значение.
  • ShowExamples: показывать ли примеры заданий. Значение по умолчанию: false

Приложение использует фреймворк Gin, поэтому вы можете передать любой обработчик Gin в Use.

Стандартные операторы

Goflow предоставляет несколько операторов для общих задач. Примеры в jobs.go

  • Command выполняет команду оболочки.
  • Get выполняет запрос GET.
  • Post выполняет POST-запрос.
  • Tengo исполняемые скрипты репозиторий
  • Dockers Dag основанный на докерах

Хранилища

По умолчанию используетcя база данных в памяти Для сохранения истории выполнения заданий планируется разработать механизм подключения к базам данных «ключ-значение».

API и интеграция

Вы можете использовать API для интеграции Goflow с другими приложениями, например с существующей информационной панелью. Вот обзор доступных конечных точек:

  • GET /api/health: проверьте работоспособность службы.
  • GET /api/jobs: список зарегистрированных заданий.
  • GET /api/jobs/{jobname}: получить подробную информацию о задании.
  • GET /api/jobruns: Запрос и список запусков заданий.
  • POST /api/jobs/{jobname}/submit: отправить задание на выполнение.
  • POST /api/jobs/{jobname}/toggle: включение или выключение расписания заданий.
  • /stream: эта конечная точка возвращает события, отправленные сервером, с полезной нагрузкой data, соответствующей той, которая возвращается /api/jobruns.

Статистику потребления ресурсов машины можно посмотреть: http://localhost:8181/debug/statsviz/

Описание

Типовой стенд для проверки моделей машинного обучения

Конвейеры
0 успешных
0 с ошибкой