一、what
首先,什么是spring-integration?研究之初,对这根管道有些迷惑,这是队列?这个activeMQ有啥区别?待研究了一段时间之后,才发现,spring-integration越来越像曾经做过的esb组件。那么spring-integration到底是什么呢?
官网给出的解释是,spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式。对,spring-integration是一个集大成者。就我自己的理解,集成了众多功能的它,是一种便捷的事件驱动消息框架用来在系统之间做消息传递的。
二、why
那么,我们为什么用它呢?spring-integration的官网上,给出了以下说法
spring-integration的目标
-
提供一个简单的模型来实现复杂的企业集成解决方案
-
为基于spring的应用添加异步的、消息驱动的行为
-
让更多的Spring用户来使用他
看这种解释,我的直观感觉是:啥玩意?不懂啊!接着看到spring-integration的原则
-
组件之间应该是松散的,模块性的易测的
-
应用框架应该强迫分离业务逻辑和集成逻辑
-
扩展节点应该有更好的抽象和可以再使用的能力
感觉,这个应该说的是解耦吧。另外看了下其他人的理解,如果你的系统处在各个系统的中间,需要JMS交互,又需要Database/Redis/MongoDB,还需要监听Tcp/UDP等,还有固定的文件转移,分析。还面对着时不时的更改需求的风险。那么,它再适合不过了。
三、how
那么,重点来了,如何使用呢?在介绍之前,先简单的介绍几个名词。
1.Message
Message是它的基础构件和核心,所有的流程都围绕着Message运转,如图所示
Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象,放什么都行,随你 。
2.MessageChannel
消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。
对于MessageChannel,有以下几种
(1).PublishSubscribeChannel
发布订阅式通道形式,多用于消息广播形式,发送给所有已经订阅了的用户。在3.x版本之前,订阅者如果是0,启动会报错或者发送的时候报错。在4.x版本后,订阅者是0,则仍然会返回true。当然,可以配置最小订阅者数量(min-subscribers)
(2).QueueChannel
队列模式通道,最常用的形式。与发布订阅通道不同,此通道实现点对点式的传输方式,管道内部是队列方式,可以设置管道的容量,如果内部的消息已经达到了最大容量,则会阻塞住,直到队列有时间,或者发送的消息被超时处理。
(3).PriorityChannel
优先级队列通道,我的理解为QueueChannel的升级版,可以无视排队,根据设置的优先级直接插队。(壕无人性)
(4).RendezvousChannel
前方施工,禁止通行!这个是一个强行阻塞的通道,当消息进入通道后,通道禁止通行,直到消息在对方通道receive()后,才能继续使用。
(5).DirectChannel
最简单的点对点通道方式,一个简单的单线程通道。是spring-integration的默认通道类型
(6).ExecutorChannel
多线程通道模式,开启多线程执行点对点通道形式。这个通道博主还未研究,不敢多说……..
3.Message Endpoint
消息的终点,或者我称他为消息节点,在channel你不能操作消息,只能在endpoint操作。对于常用的消息节点,有以下几种
解释者,转换者,翻译者,怎么理解都可以。作用是可以将消息转为你想要的类型。可以将xml形式转换成string类型。
1
2
3
4
5
|
<!-- Filter过滤器 -->
<int:channel id="filterAChannel"/>
<int:filter input-channel="filterAChannel" output-channel="filterBChannel" expression="payload.name.equals('haha')"/>
<int:channel id="filterBChannel"/>
<int:service-activator input-channel="filterBChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>
|
(2).Filter
过滤器,顾名思义,过滤用的,用来判断一个消息是否应该被传输。用我的理解看,他就是spring-integration里面的if语句。
1
2
3
4
5
6
7
8
9
10
|
<!-- transformer转换器 -->
<int:channel id="transformerInChannel"/>
<int:transformer input-channel="transformerInChannel" output-channel="transformerOutChannel"
expression="payload.name.toUpperCase() + '- [' + T(java.lang.System).currentTimeMillis() + ']'"/>
<int:channel id="transformerOutChannel">
<int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="transformerOutChannel" ref="receiveServiceImpl" method="helloTransformer">
<int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>
|
(3).Router
路由器,用来管理一个消息应该被发送到哪个channel中。相当于JAVA里面的switch case语句吧。判断条件很多,可是使用header里面的参数具体值(比如header里面有个定义为testRouter的参数,数值为A,那么消息经过路由会发送到判断为A的通道内,后面使用中再详细讲解)
(4).Service Activator
我称他为服务激活器,是一个连接服务实例到消息系统的通用端点。对于服务激活器,可能是因为我理解的不够全面,我总是将他和通道适配器搞混,因为我自己测试发现,激活器和适配器都可以作为一个消息出通道的节点。
(5).Channel Adapter
通道适配器是将消息通道连接到某个其他系统或传输的端点。通道适配器可以是入站或出站。通常情况下,通道适配器将在消息与从其他系统(文件,HTTP请求,JMS消息等)接收或发送的任何对象或资源之间进行映射。
(6).Channel Bridge
通道桥梁,用来作为管道之间进行通信使用的,常用情景为:在一个输入管道,将管道的内容发送到另外N个管道输出,配置方式如下
1
2
3
4
5
6
7
8
9
10
11
12
|
<!-- bridge -->
<int:channel id="bridgeSendChannel"/>
<int:bridge input-channel="bridgeSendChannel" output-channel="bridgeReceiveAChannel"/>
<int:channel id="bridgeReceiveAChannel"/>
<int:bridge input-channel="bridgeReceiveAChannel" output-channel="bridgeReceiveBChannel"/>
<int:channel id="bridgeReceiveBChannel">
<int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="bridgeReceiveBChannel"
expression="@receiveServiceImpl.helloBridge(payload.name,payload.age)">
<int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>
|
另外还有Splitter(分解器),Aggregator(聚合器)等。对于其他的消息节点,博主还没有做过多研究,就不再次误人子弟了。后续会将未研究到的一一补上。
4.Channel Interceptor
管道拦截器,能够以非常优雅,非常温柔的方式捕获管道传递之间的节点。对于拦截器,spring-integration给了我们六种节点
分别是发送前,邮寄后,发送成功后,接收前,接收后,接受成功后。可以分别在不同的节点进行操作。
四、use(demo地址在本文最后)
下面使用到的Test类为
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import lombok.Data;
/**
* 普通测试dto
* @author lin
*/
@Data
public class Test {
private String name;
private String age;
}
|
(1)普通方式
xml配置,这里配置了一个通道helloWorldChannel,配置了个接收激活点,即接收方的地址为helloServiceImpl里面的hello方法。(其中ref指对应接收的类名,method指类里面接收的方法)
1
2
3
|
<!-- 测试dto模式传输 -->
<int:channel id="testChannel"/>
<int:service-activator input-channel="testChannel" ref="receiveServiceImpl" method="hello"/>
|
发送方Service里面
1
2
3
4
5
6
7
8
9
10
11
|
/**
* 测试传输dto
*/
@Override
public void testDto() {
System.out.println("testDto方法");
Test test = new Test();
test.setName("testDto");
test.setAge("18");
testChannel.send(MessageBuilder.withPayload(test).build());
}
|
接收方Service里面
1
2
3
4
|
@Override
public void hello(Test test) {
System.out.println(test.getName() + " " + test.getAge());
}
|
(2)普通多参数方式
xml配置,这里通过获取payload里面的具体参数来传参的形式
1
2
3
|
<!-- 测试多参数传递 -->
<int:channel id="moreParamChannel"/>
<int:service-activator input-channel="moreParamChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>
|
发送方Service里面,将所有的参数通过Map形式装载到payload里面
1
2
3
4
5
6
7
8
9
10
11
|
/**
* 测试多参数传输
*/
@Override
public void moreParamm() {
System.out.println("greetMoreParam方法");
HashMap<String, String> map = new HashMap();
map.put("name", "moreParam");
map.put("age", "18");
helloWorldMoreParamChannel.send(MessageBuilder.withPayload(map).build());
}
|
接收方Service里面
1
2
3
4
|
@Override
public void helloMoreParam(String name, String age) {
System.out.println(name + " " + age);
}
|
(3)JMS方式
xml配置,这里配置了个MQ,将消息放入mq中进行传递
1
2
3
4
5
6
7
8
9
10
11
12
|
<!-- 测试Mq配置-->
<int:channel id="topicChannel"/>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://127.0.0.1:61616?trace=true&keepAlive=true</value>
</property>
<property name="useAsyncSend" value="true"/>
</bean>
<int-jms:outbound-channel-adapter channel="topicChannel" destination-name="topic.myTopic" pub-sub-domain="true"/>
<int:channel id="listenerChannel"/>
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="listenerChannel"destination-name="topic.myTopic" pub-sub-domain="true"/>
<int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage"/>
|
发送方Service里面
1
2
3
4
5
6
7
8
9
10
|
/**
* 使用mq进行传输发送方法
*/
@Override
public void send() {
HashMap<String,Object> map = new HashMap<>();
map.put("name","MqService");
map.put("age","18");
topicChannel.send(MessageBuilder.withPayload(map).build());
}
|
接收方Service里面
1
2
3
|
public void processMessage(HashMap<String,Object> map) {
System.out.println("MessageListener::::::Received message: " + map.toString());
}
|
(4)订阅方式
xml配置,这里配置了两个订阅者,订阅者分别是两个方法
1
2
3
4
5
6
7
|
<!-- 测试订阅发布 -->
<!--min-subscribers=""参数为预期最小订阅者,如果必须有订阅者,则这里填写最少数;默认值为0-->
<int:publish-subscribe-channel id="pubsubChannel"/>
<int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveOne">
</int:outbound-channel-adapter>
<int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveTwo">
</int:outbound-channel-adapter>
|
发送方Service里面
1
2
3
4
5
6
7
|
@Override
public void pubsubSend() {
Test test = new Test();
test.setName("pubsubSend");
test.setAge("18");
publishSubscribeChannel.send(MessageBuilder.withPayload(test).build());
}
|
接收方Service里面
1
2
3
4
5
6
7
8
9
|
@Override
public void helloReceiveOne(Test test){
System.out.println("One:"+test.getName()+" "+test.getAge());
}
@Override
public void helloReceiveTwo(Test test){
System.out.println("Two:"+test.getName()+" "+test.getAge());
}
|
(5)router方式
xml配置,这里配置了一个入口通道,当消息进入入口后,通过判断header里面的’tsetHeader’参数的值,如果值为A,则进入routerAChannel通道,如果为B则进入routerBChannel通道。进入通道后分别进入两者的接收方法中。其中两种方法用了传递类,和多参数传递的形式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
<!-- 测试路由 -->
<!-- 路由入口 -->
<int:channel id="routingChannel">
<int:queue/>
</int:channel>
<!-- 路由器 -->
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:poller fixed-delay="0"/>
<int:mapping value="A" channel="routerAChannel"/>
<int:mapping value="B" channel="routerBChannel"/>
</int:header-value-router>
<!-- 路由出口 -->
<int:channel id="routerAChannel">
<int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="routerAChannel" ref="receiveServiceImpl" method="helloRouterTest">
<int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>
<int:channel id="routerBChannel">
<int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="routerBChannel"
expression="@receiveServiceImpl.helloRouterMap(payload.name,payload.age)">
<int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>
|
发送方Service里面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@Override
public void routerA(String name, String age) {
Test test = new Test();
test.setAge(age);
test.setName(name);
routingChannel.send(MessageBuilder.withPayload(test).setHeader("testHeader", "A").build());
}
@Override
public void routerB(String name, String age) {
HashMap<String,String> map = new HashMap<>();
map.put("name", name);
map.put("age", age);
routingChannel.send(MessageBuilder.withPayload(map).setHeader("testHeader", "B").build());
}
|
接收方Service里面
1
2
3
4
5
6
7
8
9
10
11
|
@Override
public void helloRouterTest(Test test){
System.out.println("routerA方法");
System.out.println("helloRouterTest:"+test.getName()+" "+test.getAge());
}
@Override
public void helloRouterMap(String name,String age){
System.out.println("routerB方法");
System.out.println("helloRouterMap:"+name+" "+age);
}
|
(6)网关方式
xml配置,在这里面配置了一个接口类,当调用这个接口的方法时,就会进入网关配置的通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<!-- 网关通道口模式,dto -->
<int:channel id="getWayChannel">
<int:queue/>
</int:channel>
<int:gateway service-interface="com.lin.integration.service.interfaces.UseGetWaySender" id="helloGetWaySender"
default-request-channel="getWayChannel"/>
<int:outbound-channel-adapter channel="getWayChannel" ref="receiveServiceImpl" method="hello">
<int:poller fixed-delay="0"></int:poller>
</int:outbound-channel-adapter>
<!-- 网关通道口模式,多参数传递 -->
<int:channel id="getWayMoreParamChannel">
<int:queue/>
</int:channel>
<int:gateway service-interface="com.lin.integration.service.interfaces.MoreParamSender" id="getWayMoreParamSender"
default-request-channel="getWayMoreParamChannel"/>
<int:outbound-channel-adapter channel="getWayMoreParamChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)">
<int:poller fixed-delay="0"></int:poller>
</int:outbound-channel-adapter>
|
网关interface里面
1
2
3
|
public interface UseGetWaySender {
void sendMessage(Test test);
}
|
1
2
3
|
public interface MoreParamSender {
void sendMessage(Map map);
}
|
发送方Service里面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
* 测试网关dto
*/
@Override
public void getWay() {
Test test = new Test();
test.setAge("18");
test.setName("getWay");
useGetWaySender.sendMessage(test);
}
/**
* 测试网关多参数
*/
@Override
public void getWayMoreParam() {
HashMap<String, String> map = new HashMap();
map.put("name", "getWayMoreParam");
map.put("age", "18");
moreParamSender.sendMessage(map);
}
|
(7)全局拦截器
拦截器中,将需要拦截的管道进行拦截,拦截之后就会对这个管道的发送端,接收端进行拦截,拦截的接口在上文已经提到过,拦截的配置如下
1
2
3
4
5
|
<!-- 全局拦截器 -->
<int:channel-interceptor pattern="testInterceptorChannel" order="3" ref="countingChannelInterceptor">
</int:channel-interceptor>
<int:channel id="testInterceptorChannel"/>
<int:service-activator input-channel="testInterceptorChannel" ref="receiveServiceImpl" method="hello"/>
|
对于近期的spring-integration研究,这些只是“初探”,如此好的一个框架模式,我也将在今后进行深入研究,会将文章进行补充,希望各位对于我文章里面的不足与错误的地方进行批评指出,从而能互相交流研究,多谢。
参考文献:
https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/
https://www.aliyun.com/jiaocheng/301276.html
https://blog.csdn.net/xiayutai1/article/details/53302652?locationNum=4&fps=1
http://www.iteye.com/topic/744524
https://blog.csdn.net/slivefox/article/details/3740541
https://my.oschina.net/zhzhenqin/blog/86586
http://www.importnew.com/16538.html
demo码云地址(10.21更新,增加了java dsl):
https://gitee.com/doubletreelin/spring-integration-mydemo.git