README.md

    Приветствую, сообщество!

    Меня зовут Александр, я java разработчик в компании БФТ. Тружусь я на проекте СМЭВ-адаптера, где мы занимаемся транзитивной обработкой сообщений. В нашу зону ответственности входит несколько микросервисов, которые обрабатывают очень много сообщений, почти ничего не пишут в БД, но часто обращаются в сторонние прикладные сервисы.

    Для отслеживания пути сообщения через наши микросервисы мы используем Zipkin. Помимо этого в проекте задействован Apache Camel, с помощью которого мы выстраиваем цепочку обработки сообщения в одном конкретном микросервисе. Стандартные средства для работы с Zipkin обычно позволяют легко добавить к трассе вход, выход в сервис и запись в БД, но, т.к. к нас не совсем стандартное поведение у сервисов, нам хотелось выделять в Zipkin и обращение в сторонние сервисы.

    Хотелось эту логику как-то элегантно встроить в роут Camel, но существующие средства такой возможности не предоставляли.

    Было принято решение написать свой компонент для Apache Camel. Делали мы это впервые и, к сожалению, полноценного гайда в интернетах найти не удалось…

    Встречайте! Гайд по написанию собственного Camel-компонента!

    Основные составляющие camel-компонента:

    • Component - отвечает за создание Endpointa и является входной точкой в ваш компонент
    • Endpoint - отвечает за создание Producer и Consumer. Также хранит в себе параметры из урла.
    • Producer - принимает запросы к вашему компоненту (to("ref"))
    • Consumer - отправляет сообщения для слушателей вашего компонента (from("ref"))

    Теперь подробнее про каждого.

    Component

    Класс компонента необходимо аннотировать @Component и передать имя вашего camel-компонента. Также отнаследовать его от абстрактного класса DefaultComponent и переопределить метод createEndpoint. Как можно было догадаться этот метод отвечает за создание Endpoint и важно позаботиться о том, чтобы все необходимые зависимости попали в него, если вы не желаете их получать потом обходными путями. Кроме этого, в этом методе определяются параметры из урла.

    
    @Component("zipkintrace")
    public class ZipkinTraceComponent extends DefaultComponent {
    
        // Зависимости
        private final ZipkinTraceProperties zipkinTraceProperties;
        private final ZipkinTraceCache zipkinTraceCache;
    
    
        public ZipkinTraceComponent(
            CamelContext context, ZipkinService zipkinService, ZipkinTraceProperties zipkinTraceProperties,
            ZipkinTraceCache zipkinTraceCache
        ) {
            super(context);
            this.zipkinService = zipkinService;
            this.zipkinTraceProperties = zipkinTraceProperties;
            this.zipkinTraceCache = zipkinTraceCache;
        }
    
        @Override
        protected ZipkinTraceEndpoint createEndpoint(
            String uri, String remaining, Map<String, Object> parameters
        ) throws Exception {
    
            ZipkinTraceEndpoint endpoint = new ZipkinTraceEndpoint(
                uri, this, zipkinTraceProperties, zipkinTraceCache
            );
    
            // Сохранения параметров из урла
            setProperties(endpoint, parameters);
            endpoint.setAction(remaining);
    
            return endpoint;
        }
    }
    

    Endpoint

    Этот класс поинтереснее. Тут описывается вся необходимая информация для Apache Camel.

    Прежде всего аннотируем его @UriEndpoint. Аннотация принимает множество параметров, описание которых вы найдёте в javaDoc её файла.

    Если вы не хотите полностью настраивать Endpoint для сamel, наследуемся от DefaultEndpoint и имплементируем AsyncEndpoint, чтобы дать понять фреймворку, что Endpoint поддерживает асинхронную обработку сообщений.

    В полях класса определяем все возможные параметры, которые можно передать в урле и помечаем их соответствующими аннотациями.

    Важно! У каждого такого поля должен быть геттер и сеттер с описанным JavaDoc для них. Иначе camel-компонент не собрать.

    В этом же классе переопределяем методы создания Producer и Consumer

    @UriEndpoint(
        firstVersion = "3.21.0",
        scheme = "zipkintrace",
        syntax = "zipkintrace:action",
        title = "zipkintrace",
        category = Category.LOG,
        producerOnly = true,
        headersClass = ZipkinTraceConstants.class
    )
    public class ZipkinTraceEndpoint extends DefaultEndpoint implements AsyncEndpoint {
    
        private final ZipkinTraceProperties zipkinTraceProperties;
        private final ZipkinTraceCache zipkinTraceCache;
    
        @UriPath
        @Metadata(required = true)
        private String action;
        @UriParam
        private String route;
        @UriParam
        private String processor;
        @UriParam
        private String messageId;
        @UriParam
        private String originalMessageId;
        @UriParam
        private String iisId;
        @UriParam
        private boolean buildTraceContext;
        @UriParam(description = "Трасса, которую необходимо продолжить")
        private String traceContext;
    
        public ZipkinTraceEndpoint(String endpointUri, Component component,
                                   ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache) {
            super(endpointUri, component);
            this.zipkinTraceProperties = zipkinTraceProperties;
            this.zipkinTraceCache = zipkinTraceCache;
        }
    
        @Override
        public Producer createProducer() throws Exception {
            return new ZipkinTraceProduces(this, zipkinTraceProperties, zipkinTraceCache);
        }
    
        @Override
        public Consumer createConsumer(Processor processor) throws Exception {
            throw new IllegalArgumentException("zipkintraser has no consumer, so you cannot use get any data from him");
        }
    
        /**
         * Действие относительно трассы zipkin.
         * Перечень в ZipkinTraceAction
         */
        public String getAction() {
            return action;
        }
    
        /**
         * Действие относительно трассы zipkin.
         * Перечень в ZipkinTraceAction
         */
        public void setAction(String action) {
            this.action = action;
        }
    
      // остальные геттеры и сеттеры
    }
    

    Producer

    Тут всё проще. Как и раньше, если не хотим полностью настраивать Producer, используем стандартный абстрактный класс - DefaultProducer или DefaultAsyncProducer. Переопределяем getEndpoint, чтобы не получать стандартный интерфейс, и метод полезный работы process. В асинхронном варианте последний метод будет иметь в параметрах callback для завершения потока.

    public class ZipkinTraceProduces extends DefaultAsyncProducer {
        
        private final ZipkinTraceProperties zipkinTraceProperties;
    
        private final ZipkinTraceCache zipkinTraceCache;
    
        public ZipkinTraceProduces(ZipkinTraceEndpoint endpoint,
                                   ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache
        ) {
            super(endpoint);
            this.zipkinTraceProperties = zipkinTraceProperties;
            this.zipkinTraceCache = zipkinTraceCache;
        }
    
        @Override
        public ZipkinTraceEndpoint getEndpoint() {
            return (ZipkinTraceEndpoint) super.getEndpoint();
        }
    
        @Override
        public boolean process(Exchange exchange, AsyncCallback callback) {
    
            if (!isRunAllowed()) {
                return shutDownWithException(exchange, callback);
            }
    
            try {
    
                // полезная работа
                
                callback.done(true);
                return true;
            } catch (Throwable e) {
                exchange.setException(e);
                callback.done(true);
    
                return true;
            }
        }
    
        private boolean shutDownWithException(Exchange exchange, AsyncCallback callback) {
    
            if (isNull(exchange.getException())) {
                exchange.setException(new RejectedExecutionException());
            }
            callback.done(true);
            return true;
        }
    }
    

    Consumer

    В этом классе определяется логика, которая будет отправлять сообщения слушателям. Например, может запускаться слушатель очереди или какая-то крон-задача.

    Для этого используем класс DefaultConsumer в качестве родительского и переопределяем методы doStart, doStop.

    Если поток сообщений может быть приостановлен (не полное отключение), нужно пометить класс интерфейсом Suspendable. Методы для обработки этого поведения doSuspend и doResume

    public class ZipkinTraceConsumer extends DefaultConsumer {
    
        public ZipkinTraceConsumer(Endpoint endpoint, Processor processor) {
            super(endpoint, processor);
        }
    
        @Override
        protected void doStart() throws Exception {
            super.doStart();
        }
    
        @Override
        protected void doStop() throws Exception {
            super.doStop();
        }
    
        @Override
        protected void doSuspend() {
        }
    
        @Override
        protected void doResume() throws Exception {
        }
    }
    

    Теперь неочевидное

    К сожалению, для того чтобы фреймворк заметил ваш компонент и позволил его использовать действий описанных выше недостаточно. Помимо всего этого нужно добавить плагин, который сгенерирует метаинформацию по вашему компоненту во время компиляции кода. И вот тогда camel признает все ваши труды.

    <plugins>
        <plugin>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-component-maven-plugin</artifactId>
            <version>${camel-version}</version>
            <executions>
                <execution>
                    <id>generate</id>
                    <goals>
                        <goal>generate</goal>
                    </goals>
                    <phase>process-classes</phase>
                </execution>
            </executions>
        </plugin>
    </plugins>
    

    И создать файл мета информации в пакете resources/META-INF/services/org/apache/camel/component/

    Файл назвать по имени компонента.

    img_1.png

    Содержимое файла

    class=ru.gov.pfr.ecp.iis.smev.adapter.zipkin.camel.component.ZipkinTraceComponent
    

    Что получилось у нас

     from("direct:" + FSSP_REPORT_ARREST_PROCESSING_ROUTE)
        .routeId(FSSP_REPORT_ARREST_PROCESSING_ROUTE)
        .log(LoggingLevel.INFO, FSSP_REPORT_ARREST_PROCESSING_ROUTE + ".start")
        .to("zipkintrace:scoped?processor=ReportArrestXmlEACreateProcessor") <-- Обращение в условный S3
        .process(reportArrestXmlEACreateProcessor)
        .to("zipkintrace:scoped?processor=ReportArrestXmlSignProcessor") <-- Обращение в прикладной сервис
        .process(reportArrestXmlSignProcessor)
        .to("zipkintrace:scoped?processor=ReportArrestArchiveCreateProcessor") <-- Сохранение результата в S3
        .process(reportArrestArchiveCreateProcessor)
        .to("zipkintrace:end")
        .process(convertProcessor);
    

    img.png

    Надеюсь, эта статья будет полезна и убережёт вас от подводных каменей Apache Camel.

    Полезные материалы

    1. Код примера
    2. Документация фреймворка
    3. Другие компоненты в открытом доступе
    4. Stackoverflow
    Конвейеры
    0 успешных
    0 с ошибкой