Skip to content

Commit

Permalink
Migrate RabbitMQ docs to antora
Browse files Browse the repository at this point in the history
  • Loading branch information
olegz committed Sep 14, 2023
1 parent 6d26ab7 commit 6a2ba83
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 120 deletions.
18 changes: 18 additions & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,25 @@
*** Tips and Recipes
**** xref:kafka/kafka_tips.adoc[]
** RabbitMQ
*** xref:rabbit/rabbit_overview.adoc[]
*** Configuration Options
***** xref:rabbit/rabbit_overview/binder-properties.adoc[]
***** xref:rabbit/rabbit_overview/rabbitmq-consumer-properties.adoc[]
***** xref:rabbit/rabbit_overview/prod-props.adoc[]
***** xref:rabbit/rabbit_overview/advanced-listener-container-configuration.adoc[]
***** xref:rabbit/rabbit_overview/advanced-binding-configuration.adoc[]
***** xref:rabbit/rabbit_overview/receiving-batch.adoc[]
***** xref:rabbit/rabbit_overview/publisher-confirms.adoc[]
***** xref:rabbit/rabbit_overview/rabbitmq-stream-consumer.adoc[]
***** xref:rabbit/rabbit_overview/rabbitmq-stream-producer.adoc[]
*** xref:rabbit/rabbit_overview/existing-destinations.adoc[]
*** xref:rabbit/rabbit_overview/rabbitmq-retry.adoc[]
**** xref:rabbit/rabbit_overview/putting-it-all-together.adoc[]
*** xref:rabbit/rabbit_overview/error-channels.adoc[]
*** xref:rabbit/rabbit_partitions.adoc[]
*** xref:rabbit/rabbit_overview/health-indicator.adoc[]
** Apache Pulsar
*** xref:pulsar/pulsar_binder.adoc[]
** https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#spring-cloud-stream-binder-for-solace-pubsub[Solace]
** https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc[Amazon Kinesis]
* xref:schema-registry/spring-cloud-stream-schema-registry.adoc[]
Expand Down
3 changes: 3 additions & 0 deletions docs/modules/ROOT/pages/rabbit/rabbit_overview.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
= RabbitMQ Binder Reference Guide
:page-section-summary-toc: 1

This guide describes the RabbitMQ implementation of the Spring Cloud Stream Binder.
It contains information about its design, usage and configuration options, as well as information on how the Stream Cloud Stream concepts map into RabbitMQ specific constructs.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[[advanced-queue/exchange/binding-configuration]]
= Advanced Queue/Exchange/Binding Configuration
[[advanced-configuration]]
= Advanced Configuration
:page-section-summary-toc: 1

From time to time, the RabbitMQ team add new features that are enabled by setting some argument when declaring, for example, a queue.
Expand Down
43 changes: 43 additions & 0 deletions docs/modules/ROOT/pages/rabbit/rabbit_overview/error-channels.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[[rabbit-error-channels]]
= Error Channels

Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`" for more information.

RabbitMQ has two types of send failures:

* Returned messages,
* Negatively acknowledged https://www.rabbitmq.com/confirms.html[Publisher Confirms].

The latter is rare.
According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.".
You can also get a negative acknowledgment if you publish to a bounded queue with `reject-publish` queue overflow behavior.

As well as enabling producer error channels (as described in "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`"), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows:

* `ccf.setPublisherConfirms(true);`
* `ccf.setPublisherReturns(true);`

When using Spring Boot configuration for the connection factory, set the following properties:

* `spring.rabbitmq.publisher-confirms`
* `spring.rabbitmq.publisher-returns`

The payload of the `ErrorMessage` for a returned message is a `ReturnedAmqpMessageException` with the following properties:

* `failedMessage`: The spring-messaging `Message<?>` that failed to be sent.
* `amqpMessage`: The raw spring-amqp `Message`.
* `replyCode`: An integer value indicating the reason for the failure (for example, 312 - No route).
* `replyText`: A text value indicating the reason for the failure (for example, `NO_ROUTE`).
* `exchange`: The exchange to which the message was published.
* `routingKey`: The routing key used when the message was published.

Also see xref:rabbit/rabbit_overview/publisher-confirms.adoc[Publisher Confirms] for an alternative mechanism to receive returned messages.

For negatively acknowledged confirmations, the payload is a `NackedAmqpMessageException` with the following properties:

* `failedMessage`: The spring-messaging `Message<?>` that failed to be sent.
* `nackReason`: A reason (if available -- you may need to examine the broker logs for more information).

There is no automatic handling of these exceptions (such as sending to a xref:rabbit/rabbit_dlq.adoc[dead-letter queue]).
You can consume these exceptions with your own Spring Integration flow.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[[using-existing-destinations]]
= Using Existing Queues/Exchanges

By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property `<prefix><destination>`.
The destination defaults to the binding name, if not provided.
When binding a consumer, a queue will automatically be provisioned with the name `<prefix><destination>.<group>` (if a `group` binding property is specified), or an anonymous, auto-delete queue when there is no `group`.
The queue will be bound to the exchange with the "match-all" wildcard routing key (`#`) for a non-partitioned binding or `<destination>-<instanceIndex>` for a partitioned binding.
The prefix is an empty `String` by default.
If an output binding is specified with `requiredGroups`, a queue/binding will be provisioned for each group.

There are a number of rabbit-specific binding properties that allow you to modify this default behavior.

If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named `myExchange` and the queue is named `myQueue`:

* `spring.cloud.stream.bindings.<binding name>.destination=myExchange`
* `spring.cloud.stream.bindings.<binding name>.group=myQueue`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true`

If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties.
Refer to the property documentation above for more information.

* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>`

* `spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'`

There are similar properties used when declaring a dead-letter exchange/queue, when `autoBindDlq` is `true`.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[[rabbit-binder-health-indicator]]
= Rabbit Binder Health Indicator

The health indicator for Rabbit binder delegates to the one provided from Spring Boot.
For more information on this, see https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.endpoints.health.auto-configured-health-indicators[this].

You can disable this health indicator at the binder level by using the property - `management.health.binders.enabled` and set this to `false`.
In the case of multi-binder environments, this has to be set on the binder's environment properties.

When the health indicator is disabled, you should see something like the below in the health actuator endpoint:

```
"rabbit": {
"status": "UNKNOWN"
}
```

At the Spring Boot level, if you want to disable the Rabbit health indicator, you need to use the property `management.health.rabbit.enabled` and set to `false`.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[[rabbit-prod-props]]
= Rabbit Producer Properties
= RabbitMQ Producer Properties

The following properties are available for Rabbit producers only and must be prefixed with `spring.cloud.stream.rabbit.bindings.<channelName>.producer.`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,66 +45,3 @@ public class XDeathApplication {
----

Notice that the count property in the `x-death` header is a `Long`.

[[rabbit-error-channels]]
== Error Channels

Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`" for more information.

RabbitMQ has two types of send failures:

* Returned messages,
* Negatively acknowledged https://www.rabbitmq.com/confirms.html[Publisher Confirms].

The latter is rare.
According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.".
You can also get a negative acknowledgment if you publish to a bounded queue with `reject-publish` queue overflow behavior.

As well as enabling producer error channels (as described in "`xref:spring-cloud-stream/overview-error-handling.adoc[Error Handling]`"), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows:

* `ccf.setPublisherConfirms(true);`
* `ccf.setPublisherReturns(true);`

When using Spring Boot configuration for the connection factory, set the following properties:

* `spring.rabbitmq.publisher-confirms`
* `spring.rabbitmq.publisher-returns`

The payload of the `ErrorMessage` for a returned message is a `ReturnedAmqpMessageException` with the following properties:

* `failedMessage`: The spring-messaging `Message<?>` that failed to be sent.
* `amqpMessage`: The raw spring-amqp `Message`.
* `replyCode`: An integer value indicating the reason for the failure (for example, 312 - No route).
* `replyText`: A text value indicating the reason for the failure (for example, `NO_ROUTE`).
* `exchange`: The exchange to which the message was published.
* `routingKey`: The routing key used when the message was published.

Also see xref:rabbit/rabbit_overview/publisher-confirms.adoc[Publisher Confirms] for an alternative mechanism to receive returned messages.

For negatively acknowledged confirmations, the payload is a `NackedAmqpMessageException` with the following properties:

* `failedMessage`: The spring-messaging `Message<?>` that failed to be sent.
* `nackReason`: A reason (if available -- you may need to examine the broker logs for more information).

There is no automatic handling of these exceptions (such as sending to a xref:rabbit/rabbit_dlq.adoc[dead-letter queue]).
You can consume these exceptions with your own Spring Integration flow.

[[rabbit-binder-health-indicator]]
== Rabbit Binder Health Indicator

The health indicator for Rabbit binder delegates to the one provided from Spring Boot.
For more information on this, see https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.endpoints.health.auto-configured-health-indicators[this].

You can disable this health indicator at the binder level by using the property - `management.health.binders.enabled` and set this to `false`.
In the case of multi-binder environments, this has to be set on the binder's environment properties.

When the health indicator is disabled, you should see something like the below in the health actuator endpoint:

```
"rabbit": {
"status": "UNKNOWN"
}
```

At the Spring Boot level, if you want to disable the Rabbit health indicator, you need to use the property `management.health.rabbit.enabled` and set to `false`.
23 changes: 23 additions & 0 deletions docs/modules/ROOT/pages/rabbit/rabbit_overview/rabbitmq-retry.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[[retry-with-the-rabbitmq-binder]]
= Retry With the RabbitMQ Binder

When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured.
This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread.
An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself.
See "`xref:rabbit/rabbit_overview/binder-properties.adoc[RabbitMQ Binder Properties]`" for more information about the properties discussed here.
You can use the following example configuration to enable this feature:

* Set `autoBindDlq` to `true`.
The binder create a DLQ.
Optionally, you can specify a name in `deadLetterQueueName`.
* Set `dlqTtl` to the back off time you want to wait between redeliveries.
* Set the `dlqDeadLetterExchange` to the default exchange.
Expired messages from the DLQ are routed to the original queue, because the default `deadLetterRoutingKey` is the queue name (`destination.group`).
Setting to the default exchange is achieved by setting the property with no value, as shown in the next example.

To force a message to be dead-lettered, either throw an `AmqpRejectAndDontRequeueException` or set `requeueRejected` to `false` (the default) and throw any exception.

The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts.
Fortunately, RabbitMQ provides the `x-death` header, which lets you determine how many cycles have occurred.

To acknowledge a message after giving up, throw an `ImmediateAcknowledgeAmqpException`.
Original file line number Diff line number Diff line change
Expand Up @@ -60,57 +60,3 @@ spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

When using the stream client, if you set a `confirmAckChannel`, a copy of a successfully sent message will be sent to that channel.

[[using-existing-queues/exchanges]]
= Using Existing Queues/Exchanges

By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property `<prefix><destination>`.
The destination defaults to the binding name, if not provided.
When binding a consumer, a queue will automatically be provisioned with the name `<prefix><destination>.<group>` (if a `group` binding property is specified), or an anonymous, auto-delete queue when there is no `group`.
The queue will be bound to the exchange with the "match-all" wildcard routing key (`#`) for a non-partitioned binding or `<destination>-<instanceIndex>` for a partitioned binding.
The prefix is an empty `String` by default.
If an output binding is specified with `requiredGroups`, a queue/binding will be provisioned for each group.

There are a number of rabbit-specific binding properties that allow you to modify this default behavior.

If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named `myExchange` and the queue is named `myQueue`:

* `spring.cloud.stream.bindings.<binding name>.destination=myExchange`
* `spring.cloud.stream.bindings.<binding name>.group=myQueue`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true`

If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties.
Refer to the property documentation above for more information.

* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey`
* `spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>`

* `spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'`

There are similar properties used when declaring a dead-letter exchange/queue, when `autoBindDlq` is `true`.

[[retry-with-the-rabbitmq-binder]]
== Retry With the RabbitMQ Binder

When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured.
This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread.
An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself.
See "`xref:rabbit/rabbit_overview/binder-properties.adoc[RabbitMQ Binder Properties]`" for more information about the properties discussed here.
You can use the following example configuration to enable this feature:

* Set `autoBindDlq` to `true`.
The binder create a DLQ.
Optionally, you can specify a name in `deadLetterQueueName`.
* Set `dlqTtl` to the back off time you want to wait between redeliveries.
* Set the `dlqDeadLetterExchange` to the default exchange.
Expired messages from the DLQ are routed to the original queue, because the default `deadLetterRoutingKey` is the queue name (`destination.group`).
Setting to the default exchange is achieved by setting the property with no value, as shown in the next example.

To force a message to be dead-lettered, either throw an `AmqpRejectAndDontRequeueException` or set `requeueRejected` to `false` (the default) and throw any exception.

The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts.
Fortunately, RabbitMQ provides the `x-death` header, which lets you determine how many cycles have occurred.

To acknowledge a message after giving up, throw an `ImmediateAcknowledgeAmqpException`.

0 comments on commit 6a2ba83

Please sign in to comment.