引言
TIPS
- 本文基于Spring Cloud Stream 2.2.0.RC1,包含其新特性。
- 内容稍微有点乱,但这毕竟是个人学习笔记分享,不是从0到1的手把手系列博客,望知悉。
Spring Cloud Stream比较优秀,它屏蔽了各种MQ的差异,统一了编程模型(可以类比成基于MQ通信圈的”Spring Data”);其次个人实体书《Spring Cloud 与 Docker 微服务架构实战》没有包含这部分内容也是一大遗憾;更重要的是,这货细节其实挺多,而且上手是稍微有一点曲线的。
概念
group
By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups.
组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费。
组内单次只有1个实例消费,并且会轮询负载均衡。
In general, it is preferable to always specify a consumer group when binding an application to a given destination.
通常,在将应用程序绑定到给定目标时,最好始终指定使用者组。
partition
One or more producer application instances send data to multiple consumer application instances and ensure that data identified by common characteristics are processed by the same consumer instance.
一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上。
destination binder
与外部消息系统通信的组件,为构造 Binding
提供了 2 个方法,分别是 bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。
destination binding
Binding
是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建。
Applying the @EnableBinding annotation to one of the application’s configuration classes defines a destination binding.
使用@EnableBinding即可定义destination binding
注解
Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface.
1
|
public interface Barista { @Input("inboundOrders") SubscribableChannel orders();}
|
Input注解作用
- 为每个binding生成channel实例
- 指定channel名称
- 在spring容器中生成一个名为inboundOrders,类型为SubscribableChannel的bean
- 在spring容器中生成一个类,实现Barista接口。
@InboundChannelAdapter
示例:
1
2
3
4
5
|
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
|
作用:表示定义的方法能产生消息。用InboundChannelAdapter注解的方法上即使有参数也没用。
- fixedDelay:多少毫秒发送1次
- maxMessagesPerPoll:一次发送几条消息。
ServiceActivator
1
2
3
4
|
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}
|
表示方法能够处理消息或消息有效内容,监听input消息,用方法体的代码处理,然后输出到output中。
和ServiceActivator差不多,表示方法能够转换消息,消息头,或消息有效内容
In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:
- It must not return a value.
- It must be an individual message handling method (reactive API methods are not supported).
condition的作用:符合条件,才走处理方法。
condition起作用的两个条件:
- 注解的方法没有返回值
- 方法是一个独立方法,不支持Reactive API
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void handle(String body) {
System.out.println("Received: " + body);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))
public MessageSource<String> test() {
return () -> {
Map<String, Object> map = new HashMap<>(1);
map.put("type", "dog");
return new GenericMessage<>("abcdef", map);
};
}
|
PollableMessageSource
作用:允许消费者控制消费速率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@SpringBootApplication
@EnableBinding({ConsumerApplication.PolledProcessor.class})
@EnableSchedulingpublic
class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.input().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
System.out.println(payload);
});
}
public interface PolledProcessor {
@Input
PollableMessageSource input();
@Output
MessageChannel output();
}
@Bean
@InboundChannelAdapter(value = "output", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> {
Map<String, Object> map = new HashMap<>(1);
map.put("type", "dog");
return new GenericMessage<>("adfdfdsafdsfa", map);
};
}
}
|
如果不想自己进行byte数组转换,可以添加配置:
1
2
3
4
5
6
7
|
spring:
cloud:
stream:
bindings:
output:
# 指定content-type
content-type: text/plain
|
相关文章
错误处理
应用处理
方式1:处理指定channel
配置:
1
2
3
4
5
6
7
8
9
|
spring:
cloud:
stream:
bindings:
input:
destination: my-destination
group: my-group
output:
destination: my-destination
|
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableSchedulingpublic
class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@ServiceActivator(inputChannel = "my-destination.my-group.errors")
public void handleError(ErrorMessage message) {
Throwable throwable = message.getPayload();
log.error("截获异常", throwable);
Message<?> originalMessage = message.getOriginalMessage();
assert originalMessage != null;
log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload()));
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>("adfdfdsafdsfa");
}
}
|
方式2:处理所有channel
1
2
3
4
5
6
7
8
9
10
|
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
System.out.println("Handling ERROR: " + errorMessage);
}
|
系统处理
系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理器,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。
丢弃
默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。
DLQ
配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring:
cloud:
stream:
bindings:
input:
destination: my-destination
group: my-group
output:
destination: my-destination
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true
|
代码:
1
2
3
4
5
6
7
8
9
10
|
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>("adfdfdsafdsfa");
}
|
这样,消息消费失败后,就会放入死信队列。在控制台操作一下,即可将这些消息放回消息队列。客户端就可以重新处理。
如果想获取原始错误的异常堆栈,可添加如下配置:
1
2
3
4
5
6
7
8
|
spring:
cloud:
stream:
rabbit:
bindings:
input:
consumer:
republish-to-dlq: true
|
requeue
Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1
,那么RetryTemplate则不再重试。此时可通过requeue方式处理异常。
添加如下配置:
1
2
3
4
|
# 默认是3,设为1则禁用重试
spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
|
这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException
异常为止。
RetryTemplate
配置方式
RetryTemplate重试也是错误处理的一种手段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
spring:
cloud:
stream:
bindings:
<input channel名称>:
consumer:
# 最多尝试处理几次,默认3
maxAttempts: 3
# 重试时初始避退间隔,单位毫秒,默认1000
backOffInitialInterval: 1000
# 重试时最大避退间隔,单位毫秒,默认10000
backOffMaxInterval: 10000
# 避退乘数,默认2.0
backOffMultiplier: 2.0
# 当listen抛出retryableExceptions未列出的异常时,是否要重试
defaultRetryable: true
# 异常是否允许重试的map映射
retryableExceptions:
java.lang.RuntimeException: true
java.lang.IllegalStateException: false
|
测试代码:
1
2
3
4
5
6
7
8
9
10
11
12
|
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException(body);
}
private AtomicInteger count = new AtomicInteger(0);
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>(count.getAndAdd(1) + "");
}
|
编码方式
多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Configurationclass
RetryConfiguration {
@StreamRetryTemplate public RetryTemplate sinkConsumerRetryTemplate () {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
private ExceptionClassifierRetryPolicy retryPolicy () {
BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(Collections.singletonList(IllegalAccessException.class));
keepRetryingClassifier.setTraverseCauses(true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
retryPolicy.setExceptionClassifier(classifiable -> keepRetryingClassifier.classify(classifiable) ? alwaysRetryPolicy : simpleRetryPolicy);
return retryPolicy;
}
private FixedBackOffPolicy backOffPolicy () {
final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2);
return backOffPolicy;
}
}
|
然后添加配置:
1
|
spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate
|
注: Spring Cloud Stream 2.2才支持设置retry-template-name
动态绑定目标
这是Spring Integration原生的API,建议有时间了解下Spring Integration相关文档。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
@EnableBinding
@Controllerpublic
class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@Autowired
@Qualifier("sourceChannel")
private MessageChannel localChannel;
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
localChannel.send(MessageBuilder.createMessage(body, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "sourceChannel")
public MessageChannel localChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
//Following sink is used as test consumer. It logs the data received through the consumer. @EnableBinding(Sink.class) static class TestSink {
private final Log logger = LogFactory.getLog(getClass());
@StreamListener(Sink.INPUT1)
public void receive(String data) {
logger.info("Data received from customer-1..." + data);
}
@StreamListener(Sink.INPUT2)
public void receiveX(String data) {
logger.info("Data received from customer-2..." + data);
}
}
interface Sink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input2();
}
|