Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing consumer cb causes librdkafka assertion failure #207

Open
Arcoth opened this issue May 23, 2023 · 1 comment
Open

Missing consumer cb causes librdkafka assertion failure #207

Arcoth opened this issue May 23, 2023 · 1 comment

Comments

@Arcoth
Copy link

Arcoth commented May 23, 2023

Hi,

consider the following scenario: After subscribing and seeking into some topic X, while performing the subscribe on the KafkaConsumer of a further topic Y, we end up inside rd_kafka_q_serve (which polls and executes operations) with an operation of rko_type ==RD_KAFKA_OP_FETCH and cb_type == RD_KAFKA_Q_CB_CALLBACK (presumably concerning topic X). This eventually leads us into rd_kafka_poll_cb which says

                if (!rk->rk_conf.consume_cb ||
                    cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */

and we eventually hit the assertion in the q_serve because all ops must be handled:

                res = rd_kafka_op_handle(rk, &localq, rko, cb_type, opaque,
                                         callback);
                /* op must have been handled */
                rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);

The solution I tried was adding the consume_cb that is missing above, which KafkaConsumer does not set. So I add

    rd_kafka_conf_set_consume_cb(conf,
    [](rd_kafka_message_t* rkmessage, void* opaque) {
        throw;
    });

into KafkaConsumer::registerConfigCallbacks. Interestingly, because the rko version of the fetch operation is outdated, it is discarded anyway, see rd_kafka_consume_cb

        if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
            rko->rko_type == RD_KAFKA_OP_BARRIER) {
                rd_kafka_op_destroy(rko);
                return RD_KAFKA_OP_RES_HANDLED;
        }

so actually the CB is never invoked, however is required by librdkafka logic. Can we add a vacuous callback (perhaps with either an exception or just no-op) so this failure mode is averted?

@kenneth-jia
Copy link
Contributor

Hi, Arcoth,
Could you please help show some demo code about how to trigger the issue?
According to my understanding, calling the consumser.subscribe(...) twice (for 2 different topics) would have a chance to reproduce it, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants