diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index fd768bd5a..868751e92 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -96,7 +96,25 @@ *** Tips and Recipes **** xref:kafka/kafka_tips.adoc[] ** RabbitMQ +*** xref:rabbit/rabbit_overview.adoc[] +*** Configuration Options +***** xref:rabbit/rabbit_overview/binder-properties.adoc[] +***** xref:rabbit/rabbit_overview/rabbitmq-consumer-properties.adoc[] +***** xref:rabbit/rabbit_overview/prod-props.adoc[] +***** xref:rabbit/rabbit_overview/advanced-listener-container-configuration.adoc[] +***** xref:rabbit/rabbit_overview/advanced-binding-configuration.adoc[] +***** xref:rabbit/rabbit_overview/receiving-batch.adoc[] +***** xref:rabbit/rabbit_overview/publisher-confirms.adoc[] +***** xref:rabbit/rabbit_overview/rabbitmq-stream-consumer.adoc[] +***** xref:rabbit/rabbit_overview/rabbitmq-stream-producer.adoc[] +*** xref:rabbit/rabbit_overview/existing-destinations.adoc[] +*** xref:rabbit/rabbit_overview/rabbitmq-retry.adoc[] +**** xref:rabbit/rabbit_overview/putting-it-all-together.adoc[] +*** xref:rabbit/rabbit_overview/error-channels.adoc[] +*** xref:rabbit/rabbit_partitions.adoc[] +*** xref:rabbit/rabbit_overview/health-indicator.adoc[] ** Apache Pulsar +*** xref:pulsar/pulsar_binder.adoc[] ** https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#spring-cloud-stream-binder-for-solace-pubsub[Solace] ** https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc[Amazon Kinesis] * xref:schema-registry/spring-cloud-stream-schema-registry.adoc[] diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview.adoc index 6290ac379..6b35ec5e8 100644 --- a/docs/modules/ROOT/pages/rabbit/rabbit_overview.adoc +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview.adoc @@ -1,3 +1,6 @@ += RabbitMQ Binder Reference Guide +:page-section-summary-toc: 1 + This guide describes the RabbitMQ implementation of the Spring Cloud Stream Binder. It contains information about its design, usage and configuration options, as well as information on how the Stream Cloud Stream concepts map into RabbitMQ specific constructs. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/advanced-queue/exchange/binding-configuration.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/advanced-binding-configuration.adoc similarity index 86% rename from docs/modules/ROOT/pages/rabbit/rabbit_overview/advanced-queue/exchange/binding-configuration.adoc rename to docs/modules/ROOT/pages/rabbit/rabbit_overview/advanced-binding-configuration.adoc index 651b75728..6840b0c8b 100644 --- a/docs/modules/ROOT/pages/rabbit/rabbit_overview/advanced-queue/exchange/binding-configuration.adoc +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/advanced-binding-configuration.adoc @@ -1,5 +1,5 @@ -[[advanced-queue/exchange/binding-configuration]] -= Advanced Queue/Exchange/Binding Configuration +[[advanced-configuration]] += Advanced Configuration :page-section-summary-toc: 1 From time to time, the RabbitMQ team add new features that are enabled by setting some argument when declaring, for example, a queue. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/error-channels.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/error-channels.adoc new file mode 100644 index 000000000..a73da54ba --- /dev/null +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/error-channels.adoc @@ -0,0 +1,43 @@ +[[rabbit-error-channels]] += Error Channels + +Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel. +See "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`" for more information. + +RabbitMQ has two types of send failures: + +* Returned messages, +* Negatively acknowledged https://www.rabbitmq.com/confirms.html[Publisher Confirms]. + +The latter is rare. +According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.". +You can also get a negative acknowledgment if you publish to a bounded queue with `reject-publish` queue overflow behavior. + +As well as enabling producer error channels (as described in "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`"), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows: + +* `ccf.setPublisherConfirms(true);` +* `ccf.setPublisherReturns(true);` + +When using Spring Boot configuration for the connection factory, set the following properties: + +* `spring.rabbitmq.publisher-confirms` +* `spring.rabbitmq.publisher-returns` + +The payload of the `ErrorMessage` for a returned message is a `ReturnedAmqpMessageException` with the following properties: + +* `failedMessage`: The spring-messaging `Message` that failed to be sent. +* `amqpMessage`: The raw spring-amqp `Message`. +* `replyCode`: An integer value indicating the reason for the failure (for example, 312 - No route). +* `replyText`: A text value indicating the reason for the failure (for example, `NO_ROUTE`). +* `exchange`: The exchange to which the message was published. +* `routingKey`: The routing key used when the message was published. + +Also see xref:rabbit/rabbit_overview/publisher-confirms.adoc[Publisher Confirms] for an alternative mechanism to receive returned messages. + +For negatively acknowledged confirmations, the payload is a `NackedAmqpMessageException` with the following properties: + +* `failedMessage`: The spring-messaging `Message` that failed to be sent. +* `nackReason`: A reason (if available -- you may need to examine the broker logs for more information). + +There is no automatic handling of these exceptions (such as sending to a xref:rabbit/rabbit_dlq.adoc[dead-letter queue]). +You can consume these exceptions with your own Spring Integration flow. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/existing-destinations.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/existing-destinations.adoc new file mode 100644 index 000000000..3373ba9ce --- /dev/null +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/existing-destinations.adoc @@ -0,0 +1,30 @@ +[[using-existing-destinations]] += Using Existing Queues/Exchanges + +By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property ``. +The destination defaults to the binding name, if not provided. +When binding a consumer, a queue will automatically be provisioned with the name `.` (if a `group` binding property is specified), or an anonymous, auto-delete queue when there is no `group`. +The queue will be bound to the exchange with the "match-all" wildcard routing key (`#`) for a non-partitioned binding or `-` for a partitioned binding. +The prefix is an empty `String` by default. +If an output binding is specified with `requiredGroups`, a queue/binding will be provisioned for each group. + +There are a number of rabbit-specific binding properties that allow you to modify this default behavior. + +If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named `myExchange` and the queue is named `myQueue`: + +* `spring.cloud.stream.bindings..destination=myExchange` +* `spring.cloud.stream.bindings..group=myQueue` +* `spring.cloud.stream.rabbit.bindings..consumer.bindQueue=false` +* `spring.cloud.stream.rabbit.bindings..consumer.declareExchange=false` +* `spring.cloud.stream.rabbit.bindings..consumer.queueNameGroupOnly=true` + +If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties. +Refer to the property documentation above for more information. + +* `spring.cloud.stream.rabbit.bindings..consumer.bindingRoutingKey=myRoutingKey` +* `spring.cloud.stream.rabbit.bindings..consumer.exchangeType=` + +* `spring.cloud.stream.rabbit.bindings..producer.routingKeyExpression='myRoutingKey'` + +There are similar properties used when declaring a dead-letter exchange/queue, when `autoBindDlq` is `true`. + diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/health-indicator.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/health-indicator.adoc new file mode 100644 index 000000000..2b64c6274 --- /dev/null +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/health-indicator.adoc @@ -0,0 +1,18 @@ +[[rabbit-binder-health-indicator]] += Rabbit Binder Health Indicator + +The health indicator for Rabbit binder delegates to the one provided from Spring Boot. +For more information on this, see https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.endpoints.health.auto-configured-health-indicators[this]. + +You can disable this health indicator at the binder level by using the property - `management.health.binders.enabled` and set this to `false`. +In the case of multi-binder environments, this has to be set on the binder's environment properties. + +When the health indicator is disabled, you should see something like the below in the health actuator endpoint: + +``` +"rabbit": { + "status": "UNKNOWN" +} +``` + +At the Spring Boot level, if you want to disable the Rabbit health indicator, you need to use the property `management.health.rabbit.enabled` and set to `false`. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/prod-props.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/prod-props.adoc index 38108d401..7d8b129da 100644 --- a/docs/modules/ROOT/pages/rabbit/rabbit_overview/prod-props.adoc +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/prod-props.adoc @@ -1,5 +1,5 @@ [[rabbit-prod-props]] -= Rabbit Producer Properties += RabbitMQ Producer Properties The following properties are available for Rabbit producers only and must be prefixed with `spring.cloud.stream.rabbit.bindings..producer.`. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/putting-it-all-together.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/putting-it-all-together.adoc index e154a8bbf..3b523d240 100644 --- a/docs/modules/ROOT/pages/rabbit/rabbit_overview/putting-it-all-together.adoc +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/putting-it-all-together.adoc @@ -45,66 +45,3 @@ public class XDeathApplication { ---- Notice that the count property in the `x-death` header is a `Long`. - -[[rabbit-error-channels]] -== Error Channels - -Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel. -See "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`" for more information. - -RabbitMQ has two types of send failures: - -* Returned messages, -* Negatively acknowledged https://www.rabbitmq.com/confirms.html[Publisher Confirms]. - -The latter is rare. -According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.". -You can also get a negative acknowledgment if you publish to a bounded queue with `reject-publish` queue overflow behavior. - -As well as enabling producer error channels (as described in "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`"), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows: - -* `ccf.setPublisherConfirms(true);` -* `ccf.setPublisherReturns(true);` - -When using Spring Boot configuration for the connection factory, set the following properties: - -* `spring.rabbitmq.publisher-confirms` -* `spring.rabbitmq.publisher-returns` - -The payload of the `ErrorMessage` for a returned message is a `ReturnedAmqpMessageException` with the following properties: - -* `failedMessage`: The spring-messaging `Message` that failed to be sent. -* `amqpMessage`: The raw spring-amqp `Message`. -* `replyCode`: An integer value indicating the reason for the failure (for example, 312 - No route). -* `replyText`: A text value indicating the reason for the failure (for example, `NO_ROUTE`). -* `exchange`: The exchange to which the message was published. -* `routingKey`: The routing key used when the message was published. - -Also see xref:rabbit/rabbit_overview/publisher-confirms.adoc[Publisher Confirms] for an alternative mechanism to receive returned messages. - -For negatively acknowledged confirmations, the payload is a `NackedAmqpMessageException` with the following properties: - -* `failedMessage`: The spring-messaging `Message` that failed to be sent. -* `nackReason`: A reason (if available -- you may need to examine the broker logs for more information). - -There is no automatic handling of these exceptions (such as sending to a xref:rabbit/rabbit_dlq.adoc[dead-letter queue]). -You can consume these exceptions with your own Spring Integration flow. - -[[rabbit-binder-health-indicator]] -== Rabbit Binder Health Indicator - -The health indicator for Rabbit binder delegates to the one provided from Spring Boot. -For more information on this, see https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.endpoints.health.auto-configured-health-indicators[this]. - -You can disable this health indicator at the binder level by using the property - `management.health.binders.enabled` and set this to `false`. -In the case of multi-binder environments, this has to be set on the binder's environment properties. - -When the health indicator is disabled, you should see something like the below in the health actuator endpoint: - -``` -"rabbit": { - "status": "UNKNOWN" -} -``` - -At the Spring Boot level, if you want to disable the Rabbit health indicator, you need to use the property `management.health.rabbit.enabled` and set to `false`. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-retry.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-retry.adoc new file mode 100644 index 000000000..65ecdf6db --- /dev/null +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-retry.adoc @@ -0,0 +1,23 @@ +[[retry-with-the-rabbitmq-binder]] += Retry With the RabbitMQ Binder + +When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured. +This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread. +An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself. +See "`xref:rabbit/rabbit_overview/binder-properties.adoc[RabbitMQ Binder Properties]`" for more information about the properties discussed here. +You can use the following example configuration to enable this feature: + +* Set `autoBindDlq` to `true`. +The binder create a DLQ. +Optionally, you can specify a name in `deadLetterQueueName`. +* Set `dlqTtl` to the back off time you want to wait between redeliveries. +* Set the `dlqDeadLetterExchange` to the default exchange. +Expired messages from the DLQ are routed to the original queue, because the default `deadLetterRoutingKey` is the queue name (`destination.group`). +Setting to the default exchange is achieved by setting the property with no value, as shown in the next example. + +To force a message to be dead-lettered, either throw an `AmqpRejectAndDontRequeueException` or set `requeueRejected` to `false` (the default) and throw any exception. + +The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts. +Fortunately, RabbitMQ provides the `x-death` header, which lets you determine how many cycles have occurred. + +To acknowledge a message after giving up, throw an `ImmediateAcknowledgeAmqpException`. diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-stream-producer.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-stream-producer.adoc index f8d41bd4e..0d759d5f6 100644 --- a/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-stream-producer.adoc +++ b/docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-stream-producer.adoc @@ -60,57 +60,3 @@ spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false When using the stream client, if you set a `confirmAckChannel`, a copy of a successfully sent message will be sent to that channel. -[[using-existing-queues/exchanges]] -= Using Existing Queues/Exchanges - -By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property ``. -The destination defaults to the binding name, if not provided. -When binding a consumer, a queue will automatically be provisioned with the name `.` (if a `group` binding property is specified), or an anonymous, auto-delete queue when there is no `group`. -The queue will be bound to the exchange with the "match-all" wildcard routing key (`#`) for a non-partitioned binding or `-` for a partitioned binding. -The prefix is an empty `String` by default. -If an output binding is specified with `requiredGroups`, a queue/binding will be provisioned for each group. - -There are a number of rabbit-specific binding properties that allow you to modify this default behavior. - -If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named `myExchange` and the queue is named `myQueue`: - -* `spring.cloud.stream.bindings..destination=myExchange` -* `spring.cloud.stream.bindings..group=myQueue` -* `spring.cloud.stream.rabbit.bindings..consumer.bindQueue=false` -* `spring.cloud.stream.rabbit.bindings..consumer.declareExchange=false` -* `spring.cloud.stream.rabbit.bindings..consumer.queueNameGroupOnly=true` - -If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties. -Refer to the property documentation above for more information. - -* `spring.cloud.stream.rabbit.bindings..consumer.bindingRoutingKey=myRoutingKey` -* `spring.cloud.stream.rabbit.bindings..consumer.exchangeType=` - -* `spring.cloud.stream.rabbit.bindings..producer.routingKeyExpression='myRoutingKey'` - -There are similar properties used when declaring a dead-letter exchange/queue, when `autoBindDlq` is `true`. - -[[retry-with-the-rabbitmq-binder]] -== Retry With the RabbitMQ Binder - -When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured. -This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread. -An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself. -See "`xref:rabbit/rabbit_overview/binder-properties.adoc[RabbitMQ Binder Properties]`" for more information about the properties discussed here. -You can use the following example configuration to enable this feature: - -* Set `autoBindDlq` to `true`. -The binder create a DLQ. -Optionally, you can specify a name in `deadLetterQueueName`. -* Set `dlqTtl` to the back off time you want to wait between redeliveries. -* Set the `dlqDeadLetterExchange` to the default exchange. -Expired messages from the DLQ are routed to the original queue, because the default `deadLetterRoutingKey` is the queue name (`destination.group`). -Setting to the default exchange is achieved by setting the property with no value, as shown in the next example. - -To force a message to be dead-lettered, either throw an `AmqpRejectAndDontRequeueException` or set `requeueRejected` to `false` (the default) and throw any exception. - -The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts. -Fortunately, RabbitMQ provides the `x-death` header, which lets you determine how many cycles have occurred. - -To acknowledge a message after giving up, throw an `ImmediateAcknowledgeAmqpException`. -