Skip to content

Commit

Permalink
[ISSUE #928] Fix C++ simple consumer error code and close function
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins committed Jan 22, 2025
1 parent eecb3a6 commit 61c9f00
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 163 deletions.
54 changes: 30 additions & 24 deletions cpp/examples/ExampleSimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <iostream>

#include "gflags/gflags.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
#include "rocketmq/SimpleConsumer.h"

Expand All @@ -42,10 +43,11 @@ int main(int argc, char* argv[]) {

CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret);
credentials_provider = std::make_shared<StaticCredentialsProvider>(
FLAGS_access_key, FLAGS_access_secret);
}

// In most case, you don't need to create too many consumers, singletion pattern is recommended.
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
auto simple_consumer = SimpleConsumer::newBuilder()
.withGroup(FLAGS_group)
.withConfiguration(Configuration::newBuilder()
Expand All @@ -54,32 +56,36 @@ int main(int argc, char* argv[]) {
.withSsl(FLAGS_tls)
.build())
.subscribe(FLAGS_topic, tag)
.withAwaitDuration(std::chrono::seconds(10))
.build();
std::vector<MessageConstSharedPtr> messages;
std::error_code ec;
simple_consumer.receive(4, std::chrono::seconds(3), ec, messages);

if (ec) {
std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl;
return EXIT_FAILURE;
}
for (int j = 0; j < 30; j++) {
std::vector<MessageConstSharedPtr> messages;
std::error_code ec;
simple_consumer.receive(4, std::chrono::seconds(15), ec, messages);
if (ec) {
std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl;
}

std::cout << "Received " << messages.size() << " messages" << std::endl;
std::size_t i = 0;
for (const auto& message : messages) {
std::cout << "Received a message[topic=" << message->topic() << ", message-id=" << message->id()
<< ", receipt-handle='" << message->extension().receipt_handle << "']" << std::endl;
std::cout << "Received " << messages.size() << " messages" << std::endl;
std::size_t i = 0;

std::error_code ec;
if (++i % 2 == 0) {
simple_consumer.ack(*message, ec);
if (ec) {
std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl;
}
} else {
simple_consumer.changeInvisibleDuration(*message, std::chrono::milliseconds(100), ec);
if (ec) {
std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl;
for (const auto& message : messages) {
std::cout << "Received a message[topic=" << message->topic()
<< ", message-id=" << message->id()
<< ", receipt-handle='" << message->extension().receipt_handle
<< "']" << std::endl;

if (++i % 2 == 0) {
simple_consumer.ack(*message, ec);
if (ec) {
std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl;
}
} else {
simple_consumer.changeInvisibleDuration(*message, std::chrono::seconds(3), ec);
if (ec) {
std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl;
}
}
}
}
Expand Down
128 changes: 124 additions & 4 deletions cpp/proto/apache/rocketmq/v2/definition.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ enum DigestType {
// 1) Standard messages should be negatively acknowledged instantly, causing
// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
// previously acquired messages batch;
//
// Message consumption model also affects how invalid digest are handled. When
// messages are consumed in broadcasting way,
// TODO: define semantics of invalid-digest-when-broadcasting.
message Digest {
DigestType type = 1;
string checksum = 2;
Expand All @@ -189,6 +185,7 @@ enum ClientType {
PRODUCER = 1;
PUSH_CONSUMER = 2;
SIMPLE_CONSUMER = 3;
PULL_CONSUMER = 4;
}

enum Encoding {
Expand Down Expand Up @@ -270,9 +267,20 @@ message SystemProperties {
// orphan. Servers that manages orphan messages would pick up
// a capable publisher to resolve
optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;

// Information to identify whether this message is from dead letter queue.
optional DeadLetterQueue dead_letter_queue = 20;
}

message DeadLetterQueue {
// Original topic for this DLQ message.
string topic = 1;
// Original message id for this DLQ message.
string message_id = 2;
}

message Message {

Resource topic = 1;

// User defined key-value pairs.
Expand Down Expand Up @@ -336,6 +344,10 @@ enum Code {
MESSAGE_CORRUPTED = 40016;
// Request is rejected due to missing of x-mq-client-id header.
CLIENT_ID_REQUIRED = 40017;
// Polling time is illegal.
ILLEGAL_POLLING_TIME = 40018;
// Offset is illegal.
ILLEGAL_OFFSET = 40019;

// Generic code indicates that the client request lacks valid authentication
// credentials for the requested resource.
Expand All @@ -355,6 +367,8 @@ enum Code {
TOPIC_NOT_FOUND = 40402;
// Consumer group resource does not exist.
CONSUMER_GROUP_NOT_FOUND = 40403;
// Offset not found from server.
OFFSET_NOT_FOUND = 40404;

// Generic code representing client side timeout when connecting to, reading data from, or write data to server.
REQUEST_TIMEOUT = 40800;
Expand All @@ -363,6 +377,8 @@ enum Code {
PAYLOAD_TOO_LARGE = 41300;
// Message body size exceeds the threshold.
MESSAGE_BODY_TOO_LARGE = 41301;
// Message body is empty.
MESSAGE_BODY_EMPTY = 41302;

// Generic code for use cases where pre-conditions are not met.
// For example, if a producer instance is used to publish messages without prior start() invocation,
Expand Down Expand Up @@ -432,6 +448,13 @@ enum Language {
DOT_NET = 3;
GOLANG = 4;
RUST = 5;
PYTHON = 6;
PHP = 7;
NODE_JS = 8;
RUBY = 9;
OBJECTIVE_C = 10;
DART = 11;
KOTLIN = 12;
}

// User Agent
Expand All @@ -447,4 +470,101 @@ message UA {

// Hostname of the node
string hostname = 4;
}

message Settings {
// Configurations for all clients.
optional ClientType client_type = 1;

optional Endpoints access_point = 2;

// If publishing of messages encounters throttling or server internal errors,
// publishers should implement automatic retries after progressive longer
// back-offs for consecutive errors.
//
// When processing message fails, `backoff_policy` describes an interval
// after which the message should be available to consume again.
//
// For FIFO messages, the interval should be relatively small because
// messages of the same message group would not be readily available until
// the prior one depletes its lifecycle.
optional RetryPolicy backoff_policy = 3;

// Request timeout for RPCs excluding long-polling.
optional google.protobuf.Duration request_timeout = 4;

oneof pub_sub {
Publishing publishing = 5;

Subscription subscription = 6;
}

// User agent details
UA user_agent = 7;

Metric metric = 8;
}

message Publishing {
// Publishing settings below here is appointed by client, thus it is
// unnecessary for server to push at present.
//
// List of topics to which messages will publish to.
repeated Resource topics = 1;

// If the message body size exceeds `max_body_size`, broker servers would
// reject the request. As a result, it is advisable that Producer performs
// client-side check validation.
int32 max_body_size = 2;

// When `validate_message_type` flag set `false`, no need to validate message's type
// with messageQueue's `accept_message_types` before publishing.
bool validate_message_type = 3;
}

message Subscription {
// Subscription settings below here is appointed by client, thus it is
// unnecessary for server to push at present.
//
// Consumer group.
optional Resource group = 1;

// Subscription for consumer.
repeated SubscriptionEntry subscriptions = 2;

// Subscription settings below here are from server, it is essential for
// server to push.
//
// When FIFO flag is `true`, messages of the same message group are processed
// in first-in-first-out manner.
//
// Brokers will not deliver further messages of the same group until prior
// ones are completely acknowledged.
optional bool fifo = 3;

// Message receive batch size here is essential for push consumer.
optional int32 receive_batch_size = 4;

// Long-polling timeout for `ReceiveMessageRequest`, which is essential for
// push consumer.
optional google.protobuf.Duration long_polling_timeout = 5;
}

message Metric {
// Indicates that if client should export local metrics to server.
bool on = 1;

// The endpoint that client metrics should be exported to, which is required if the switch is on.
optional Endpoints endpoints = 2;
}

enum QueryOffsetPolicy {
// Use this option if client wishes to playback all existing messages.
BEGINNING = 0;

// Use this option if client wishes to skip all existing messages.
END = 1;

// Use this option if time-based seek is targeted.
TIMESTAMP = 2;
}
Loading

0 comments on commit 61c9f00

Please sign in to comment.