readme.md

Схема работы:

1) Собрать образы:

(в соответствующих директориях)

.../vm_source$ docker build --no-cache --squash -t vm_source -f Dockerfile.source .

.../vm_db$ docker build --no-cache --squash -t vm_db -f Dockerfile.db .

.../vm_ml$ docker build --no-cache --squash -t vm_ml -f Dockerfile.ml .

.../vm_ml_cuda$ docker build --no-cache --squash -t vm_ml_cuda -f Dockerfile.ml.cuda .

2) Стартовать compose:

Версия для CPU:

.../diss_1$ nohup docker compose up --force-recreate &

Версия для GPU:

.../diss_1$ nohup docker compose -f docker-compose-cuda.yml up --force-recreate &

Запустятся 3 контейнера:

  • vm_source
    • kafka
    • producer.py (загрузка данных в kafk’у)
    • /data (директория с данными)
  • vm_db:
    • questdb (хранение временных рядов)
    • bytewax_test_postgres.py (потоки из kafk’и в questdb)
  • vm_ml (или vm_ml_cuda):
    • mlflow (порт 8083)
    • библиотеки для ml моделей (sklearn ++)
    • jupyter lab (порт 8888)
    • /model1 - пример модели

3) Скопировать файлы с данными в контейнер vm_source

(предполагается, что эти файлы лежат на локальной машине в директории /data)

.../diss_1$ docker cp data/ <container_id>:/

4) В контейнере vm_source залить данные в kafk’у (топики deal-topic, glass-topic)

(на локальной машине)$ docker exec -it <container_id>: /bin/bash

(в контейнере)$ time python producer.py --data_dir /data --max_n 0 --flush_num 10000

5) В контейнере vm_db перекачать данные из kafk’и в questdb потоком bytewax

(пример для тикера GAZP, под который автоматически создаются приёмные таблицы)

(на локальной машине)$ docker exec -it <container_id>: /bin/bash

(в контейнере)$ export STREAM_JSON_PARAM='{"db_name": "questdb", "ticker": "GAZP", "table_type": "deal", "flush_num": 50}' && python -m bytewax.run bytewax_to_postgres.py

(в контейнере)$ export STREAM_JSON_PARAM='{"db_name": "questdb", "ticker": "GAZP", "table_type": "glass", "flush_num": 50}' && python -m bytewax.run bytewax_to_postgres.py

6) В контейнере vm_ml (или vm_ml_cuda):

- подготовить данные (из questdb) для модели (с записью эксперименты в mlflow)

- обучить модель (с записью результата в mlflow)

- запустить обученную модель как сервис в mlflow serve

- запустить скрипт для получения модельного прогноза из mlflow serve и записи его в kafk’у (топик model-topic)

(пример для тикера GAZP)

(на локальной машине)$ docker exec -it <container_id>: /bin/bash

(в контейнере)
$ cd /model1
$ . /ml-venv/bin/activate
$ python preprocess.py
$ python train.py
$ nohup mlflow models serve -m models:/model11/latest -p 5000 --no-conda &> /log/model11.log 2>&1 &
(для выхода - ctrl+c)$ python evaluate.py

Требования к файлам с данными

Файлы с данными должны быть сжаты в gzip иметь наименования вида:

.../data$ ls
deal_2021-08-03.csv.gz glass_2021-08-03.csv.gz

Пример содержимого файла со сделками:

/data$ gunzip deal_2021-08-03.csv.gz 
/data$ head -2 deal_2021-08-03.csv 
2;RTSIDX;UXAG;1627972988.9886;1858.62;1
3;RTSIDX;UX;1627972988.9886;832.36;1

Пример содержимого файла с биржевым стаканом:

/data$ gunzip glass_2021-08-03.csv.gz 
/data$ head -3 glass_2021-08-03.csv 
CETS;EUR_RUB__TOD;1627972987.3327;86.572500|86.570000|86.565000|86.562500|86.557500|86.555000|86.552500|86.550000|86.547500|86.540000|86.532500|86.530000|86.520000|86.517500|86.512500|86.510000|86.505000|86.500000|86.495000|86.492500;50|50|55|100|500|200|60|181|58|130|500|2|3000|8|8|2|4|347|1000|50;86.582500|86.585000|86.587500|86.590000|86.592500|86.595000|86.597500|86.600000|86.625000|86.630000|86.632500|86.635000|86.655000|86.677500|86.680000|86.685000|86.700000|86.702500|86.717500|86.720000;100|50|50|51|630|200|50|107|3000|50|500|50|80|1000|51|3000|1|5|2000|500
Описание
Конвейеры
0 успешных
0 с ошибкой
Разработчики