This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4! |
Error Handling
In this section we’ll explain the general idea behind error handling mechanisms provided by the framework. We’ll be using Rabbit binder as an example, since individual binders define different set of properties for certain supported mechanisms specific to underlying broker capabilities (such as Kafka binder).
Errors happen, and Spring Cloud Stream provides several flexible mechanisms to deal with them. Note, the techniques are dependent on binder implementation and the capability of the underlying messaging middleware as well as programming model (more on this later).
Whenever Message handler (function) throws an exception, it is propagated back to the binder, at which point binder will make several attempts at re-trying
the same message (3 by default) using RetryTemplate
provided by the Spring Retry library.
If retries are unsuccessful it is up to the error handling mechanism which may drop the message, re-queue the message for re-processing or send the failed message to DLQ.
Both Rabbit and Kafka support these concepts (especially DLQ). However, other binders may not, so refer to your individual binder’s documentation for details on supported error-handling options.
Keep in mind however, the reactive function does NOT qualify as a Message handler, since it does not handle individual messages and instead provides a way to connect stream (i.e., Flux) provided by the framework with the one provided by the user. Why is this important? That is because anything you read later in this section with regard to Retry Template, dropping failed messages, retrying, DLQ and configuration properties that assist with all of it only applies to Message handlers (i.e., imperative functions).
Reactive API provides a very rich library of its own operators and mechanisms to assist you with error handling specific to
variety of reactive uses cases which are far more complex then simple Message handler cases, So use them, such
as public final Flux<T> retryWhen(Retry retrySpec);
that you can find in reactor.core.publisher.Flux
.
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
.map(v -> v.toUpperCase());
}
Drop Failed Messages
By default, the system provides error handlers. The first error handler will simply log error message. The second error handler is binder specific error handler which is responsible for handling error message in the context of a specific messaging system (e.g., send to DLQ). But since no additional error handling configuration was provided (in this current scenario) this handler will not do anything. So essentially after being logged, the message will be dropped.
While acceptable in some cases, for most cases, it is not, and we need some recovery mechanism to avoid message loss.
Handle Error Messages
In the previous section we mentioned that by default messages that resulted in error are effectively logged and dropped. The framework also exposes mechanism for you
to provide custom error handler (i.e., to send notification or write to database, etc). You can do so by adding Consumer
that is specifically designed to accept ErrorMessage
which aside form all the information about the error (e.g., stack trace etc) contains the original message (the one that triggered the error).
NOTE: Custom error handler is mutually exclusive with framework provided error handlers (i.e., logging and binder error handler - see previous section) to ensure that they do not interfere.
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
return v -> {
// send SMS notification code
};
}
To identify such consumer as an error handler all you need is to provide error-handler-definition
property pointing to the function name - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler
.
For example, for binding name uppercase-in-0
the property would look like this:
spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler
And if you used special mapping instruction to map binding to a more readable name - spring.cloud.stream.function.bindings.uppercase-in-0=upper
, then this property would look like this:
spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
If by accident you declare such handler as a Function , it will still work with the exception that nothing is going to be done with its output. However, given that such handler is still relying on functionality provided by Spring Cloud Function, you can also benefit from function composition in the event your handler has some complexity which you would like to address through function composition (however unlikely).
|
Default Error Handler
If you want to have a single error handler for all function beans, you can use the standard spring-cloud-stream mechanism for defining default properties spring.cloud.stream.default.error-handler-definition=myErrorHandler
DLQ - Dead Letter Queue
Perhaps the most common mechanism, DLQ allows failed messages to be sent to a special destination: the Dead Letter Queue.
When configured, failed messages are sent to this destination for subsequent re-processing or auditing and reconciliation.
Consider the following example:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
"--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
);
}
@Bean
public Function<Person, Person> uppercase() {
return personIn -> {
throw new RuntimeException("intentional");
});
};
}
}
As a reminder, in this example uppercase-in-0
segment of the property corresponds to the name of the input destination binding.
The consumer
segment indicates that it is a consumer property.
When using DLQ, at least the group property must be provided for proper naming of the DLQ destination. However group is often used together
with destination property, as in our example.
|
Aside from some standard properties we also set the auto-bind-dlq
to instruct the binder to create and configure DLQ destination for
uppercase-in-0
binding which corresponds to uppercase
destination (see corresponding property), which results in an additional Rabbit queue named uppercase.myGroup.dlq
(see Kafka documentation for Kafka specific DLQ properties).
Once configured, all failed messages are routed to this destination preserving the original message for further actions.
And you can see that the error message contains more information relevant to the original error, as follows:
. . . .
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload: blah
You can also facilitate immediate dispatch to DLQ (without re-tries) by setting max-attempts
to '1'. For example,
--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
Retry Template
In this section we cover configuration properties relevant to configuration of retry capabilities.
The RetryTemplate
is part of the Spring Retry library.
While it is out of scope of this document to cover all of the capabilities of the RetryTemplate
, we
will mention the following consumer properties that are specifically related to
the RetryTemplate
:
- maxAttempts
-
The number of attempts to process the message.
Default: 3.
- backOffInitialInterval
-
The backoff initial interval on retry.
Default 1000 milliseconds.
- backOffMaxInterval
-
The maximum backoff interval.
Default 10000 milliseconds.
- backOffMultiplier
-
The backoff multiplier.
Default 2.0.
- defaultRetryable
-
Whether exceptions thrown by the listener that are not listed in the
retryableExceptions
are retryable.Default:
true
. - retryableExceptions
-
A map of Throwable class names in the key and a boolean in the value. Specify those exceptions (and subclasses) that will or won’t be retried. Also see
defaultRetriable
. Example:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
.Default: empty.
While the preceding settings are sufficient for the majority of the customization requirements, they may not satisfy certain complex requirements, at which
point you may want to provide your own instance of the RetryTemplate
. To do so configure it as a bean in your application configuration. The application provided
instance will override the one provided by the framework. Also, to avoid conflicts you must qualify the instance of the RetryTemplate
you want to be used by the binder
as @StreamRetryTemplate
. For example,
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}
As you can see from the above example you don’t need to annotate it with @Bean
since @StreamRetryTemplate
is a qualified @Bean
.
If you need to be more precise with your RetryTemplate
, you can specify the bean by name in your ConsumerProperties
to associate
the specific retry bean per binding.
spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>