Wake up all broker threads when Idempotent PID is assigned

Wake up all broker threads when Idempotent PID is assigned Previously only UP brokers were awoken, but with sparse connections we need to cover them all.

АвторMagnus Edenhill
КоммитерMagnus Edenhill
7 лет назад
Файлов изменено: 1
+13
–5
223e213
src/rdkafka_idempotence.c
@@ -278,9 +278,9 @@
        rd_kafka_wrunlock(rk);
        /* Wake up all broker threads that may have messages to send
         * that were waiting for a Producer ID. */
        rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_UP);
        /* Wake up all broker threads (that may have messages to send
         * that were waiting for a Producer ID). */
        rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
}
@@ -333,13 +333,14 @@
 * @locks none
 */
static void rd_kafka_idemp_drain_done (rd_kafka_t *rk) {
        int restart_tmr = 0;
        rd_bool_t restart_tmr = rd_false;
        rd_bool_t wakeup_brokers = rd_false;
        rd_kafka_wrlock(rk);
        if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_DRAIN_RESET) {
                rd_kafka_dbg(rk, EOS"DRAIN""All partitions drained");
                rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_REQ_PID);
                restart_tmr = 1;
                restart_tmr = rd_true;
        } else if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_DRAIN_BUMP &&
                   rd_kafka_pid_valid(rk->rk_eos.pid)) {
@@ -348,12 +349,19 @@
                             "All partitions drained, bumped epoch to %s",
                             rd_kafka_pid2str(rk->rk_eos.pid));
                rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_ASSIGNED);
                wakeup_brokers = rd_true;
        }
        rd_kafka_wrunlock(rk);
        /* Restart timer to eventually trigger a re-request */
        if (restart_tmr)
                rd_kafka_idemp_restart_request_pid_tmr(rk, rd_true);
        /* Wake up all broker threads (that may have messages to send
         * that were waiting for a Producer ID). */
        if (wakeup_brokers)
                rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
}

Cherry-pick

Команда cherry-pick позволяет выбрать отдельные коммиты из одной ветки и применить их к другой.