Сервис перекладывающий поток логической репликаци pg в kafka
Плюсы
- Invented Here!
- православный golang
- используется jackc/pglogrepl в сочетании с decoderbufs
- очень гибкая обработка сообщений через Lua
- используется lua-jit, собирается статически
- есть возможность использования нескольких функций хеширования
- возможность распараллелить запись в несколько топиков
Минусы
- В lua ограниченное количество типов, поэтому не всё из получаемой строки репликации может быть передано в скрипт. Так же возвращаемые дополнительные поля могут быть только простых типов. Возвращаемая таблица перезапишет существующие поля, надо помнить об ограничениях и не использовать скрипт для модификации сложных типов полей.
- lua-jit остановился на реализации lua 5.1 (с заимствованиями из 5.3), в котором все числа - float64 (LUA_TNUMBER), и в возвращаемой таблице дополнительных полей будет всегда float; впрочем при кодировании в json это значения не имеет
TODO
- добавить удаление полей из записи
- переработать конфигурацию
Использование
Exactly Once
Postgres в поток репликации отправляет только закоммиченную транзакцию, а реплика (gobezium) при обновлении своего состояния должна вернуть LSN последнего COMMIT
. Если реплика отвалилась без подтверждения последнего COMMIT
(в gobezium - при ошибке обработки записи и отправки в Kafka), то в следующий раз Postgres начнет передавать записи с начала последней необработанной транзакции (restart_lsn
), что может привести к повторам.
Если указан хост Redis, то после каждой отправки в Kafka текущий LSN будет запоминаться в Redis, и при старте будут пропущены все LSN, которые будут <= сохраненного.
Если указан параметр syncstatus
, то gobezium будет обновлять статус на сервере сразу после получения COMMIT
, иначе - раз в 10 секунд. Не рекомендуется включать на больших потоках репликации, на малых уменьшит количество дублей, если не используется Redis
Замечание о партицировании
Номер партиции при отправке в Kafka всегда выбирает клиент, брокер не участвует в выборе партиции. В gobezium выбор партиции перекладывается однозначно на скрипт.
Конфигурирование
Используется Viper. Параметры могут быть заданы:
- в YAML файле, указанном параметром
-config
или в/etc/gobezium/gobezium.yaml
. Возможно включение конфигурационных файлов без рекурсии (см. пример) - в переменных окружения с префиксом
GBZ_
, в этом случае точки меняются на подчеркивания, а символы капитализируются, напримерGBZ_DB_DBNAME
Скрипт
Может быть задан как файл параметром script.name
или непосредственно как строка параметром script.body
. Должен содержать функцию Process
, принимающий 4 параметра и возвращающий 2 значения. Остальной код может содержать разнообразные инициализационные конструкции, которые будут вычислены (скомпилированы) один раз при создании машины LUA. Для отладки можно использовать Print()
, вывод будет в консоль.
Скрипту доступны следующие функции хеширования:
PgHashBytes
- основная функция хеширования postgres, используется вPARTITION BY HASH
Murmur3
- 32-битный вариант Murmur3Crc32
- блочный (без финальногоXOR
) вариант с табличным полиномом0x04c11db7
из библиотекиlibiberty
Параметры функции Process
- Операция, число
- 0 - INSERT
- 1 - UPDATE
- 2 - DELETE
- Имя таблицы, включая имя схемы, строка
- Поля записи, таблица
- Дополнительные значения из файла конфигурации, таблица
Возвращаемые значения функции Process
- Топик и партиция для записи, таблица, где ключ - имя топика (строка), значение - номер партиции (число). Если возвращается nil - запись пропускается.
- Дополнительные поля, добавляемые к записи, таблица или nil
Пример
Большой пример - в файле example.lua