diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index 6b87e784af..91e12c4d1a 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -521,30 +521,18 @@ bool __ComplexKafkaTask::process_fetch() this->get_resp()->get_toppar_list()->rewind(); while ((toppar = this->get_resp()->get_toppar_list()->get_next()) != NULL) { - if (toppar->get_error() == KAFKA_OFFSET_OUT_OF_RANGE) + int toppar_error = toppar->get_error(); + + if (toppar_error == KAFKA_OFFSET_OUT_OF_RANGE) { toppar->set_offset(KAFKA_OFFSET_OVERFLOW); toppar->set_low_watermark(KAFKA_OFFSET_UNINIT); toppar->set_high_watermark(KAFKA_OFFSET_UNINIT); ret = true; } - - switch (toppar->get_error()) + else if (toppar_error) { - case KAFKA_UNKNOWN_TOPIC_OR_PARTITION: - case KAFKA_LEADER_NOT_AVAILABLE: - case KAFKA_NOT_LEADER_FOR_PARTITION: - case KAFKA_BROKER_NOT_AVAILABLE: - case KAFKA_REPLICA_NOT_AVAILABLE: - case KAFKA_KAFKA_STORAGE_ERROR: - case KAFKA_FENCED_LEADER_EPOCH: - this->get_req()->set_api_type(Kafka_Metadata); - return true; - case 0: - case KAFKA_OFFSET_OUT_OF_RANGE: - break; - default: - ctx_ = toppar->get_error(); + ctx_ = toppar_error; this->error = WFT_ERR_KAFKA_FETCH_FAILED; this->state = WFT_STATE_TASK_ERROR; return false; @@ -580,21 +568,10 @@ bool __ComplexKafkaTask::process_produce() return true; } - switch (toppar->get_error()) + if (toppar->get_error()) { - case KAFKA_UNKNOWN_TOPIC_OR_PARTITION: - case KAFKA_LEADER_NOT_AVAILABLE: - case KAFKA_NOT_LEADER_FOR_PARTITION: - case KAFKA_BROKER_NOT_AVAILABLE: - case KAFKA_REPLICA_NOT_AVAILABLE: - case KAFKA_KAFKA_STORAGE_ERROR: - case KAFKA_FENCED_LEADER_EPOCH: - this->get_req()->set_api_type(Kafka_Metadata); - return true; - case 0: - break; - default: - this->error = toppar->get_error(); + ctx_ = toppar->get_error(); + this->error = WFT_ERR_KAFKA_PRODUCE_FAILED; this->state = WFT_STATE_TASK_ERROR; return false; }