目录

Spring-Integration笔记

一、what

​ 首先,什么是spring-integration?研究之初,对这根管道有些迷惑,这是队列?这个activeMQ有啥区别?待研究了一段时间之后,才发现,spring-integration越来越像曾经做过的esb组件。那么spring-integration到底是什么呢?

​ 官网给出的解释是,spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式。对,spring-integration是一个集大成者。就我自己的理解,集成了众多功能的它,是一种便捷的事件驱动消息框架用来在系统之间做消息传递的。

二、why

​ 那么,我们为什么用它呢?spring-integration的官网上,给出了以下说法

​ spring-integration的目标

  • 提供一个简单的模型来实现复杂的企业集成解决方案

  • 为基于spring的应用添加异步的、消息驱动的行为

  • 让更多的Spring用户来使用他

    看这种解释,我的直观感觉是:啥玩意?不懂啊!接着看到spring-integration的原则

  • 组件之间应该是松散的,模块性的易测的

  • 应用框架应该强迫分离业务逻辑和集成逻辑

  • 扩展节点应该有更好的抽象和可以再使用的能力

    感觉,这个应该说的是解耦吧。另外看了下其他人的理解,如果你的系统处在各个系统的中间,需要JMS交互,又需要Database/Redis/MongoDB,还需要监听Tcp/UDP等,还有固定的文件转移,分析。还面对着时不时的更改需求的风险。那么,它再适合不过了。

三、how

​ 那么,重点来了,如何使用呢?在介绍之前,先简单的介绍几个名词。

1.Message

​ Message是它的基础构件和核心,所有的流程都围绕着Message运转,如图所示

./1.jpg

Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象,放什么都行,随你 。

2.MessageChannel

​ 消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。

./2.jpg

​ 对于MessageChannel,有以下几种

(1).PublishSubscribeChannel

​ 发布订阅式通道形式,多用于消息广播形式,发送给所有已经订阅了的用户。在3.x版本之前,订阅者如果是0,启动会报错或者发送的时候报错。在4.x版本后,订阅者是0,则仍然会返回true。当然,可以配置最小订阅者数量(min-subscribers)

(2).QueueChannel

​ 队列模式通道,最常用的形式。与发布订阅通道不同,此通道实现点对点式的传输方式,管道内部是队列方式,可以设置管道的容量,如果内部的消息已经达到了最大容量,则会阻塞住,直到队列有时间,或者发送的消息被超时处理。

(3).PriorityChannel

​ 优先级队列通道,我的理解为QueueChannel的升级版,可以无视排队,根据设置的优先级直接插队。(壕无人性)

(4).RendezvousChannel

​ 前方施工,禁止通行!这个是一个强行阻塞的通道,当消息进入通道后,通道禁止通行,直到消息在对方通道receive()后,才能继续使用。

(5).DirectChannel

​ 最简单的点对点通道方式,一个简单的单线程通道。是spring-integration的默认通道类型

(6).ExecutorChannel

​ 多线程通道模式,开启多线程执行点对点通道形式。这个通道博主还未研究,不敢多说……..

3.Message Endpoint

​ 消息的终点,或者我称他为消息节点,在channel你不能操作消息,只能在endpoint操作。对于常用的消息节点,有以下几种

(1).Transformer

​ 解释者,转换者,翻译者,怎么理解都可以。作用是可以将消息转为你想要的类型。可以将xml形式转换成string类型。

1
2
3
4
5
<!-- Filter过滤器 -->
<int:channel id="filterAChannel"/>
<int:filter input-channel="filterAChannel" output-channel="filterBChannel" expression="payload.name.equals('haha')"/>
<int:channel id="filterBChannel"/>
<int:service-activator input-channel="filterBChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>

(2).Filter

​ 过滤器,顾名思义,过滤用的,用来判断一个消息是否应该被传输。用我的理解看,他就是spring-integration里面的if语句。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<!-- transformer转换器 -->
<int:channel id="transformerInChannel"/>
<int:transformer input-channel="transformerInChannel" output-channel="transformerOutChannel"
                     expression="payload.name.toUpperCase() + '- [' + T(java.lang.System).currentTimeMillis() + ']'"/>
<int:channel id="transformerOutChannel">
   <int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="transformerOutChannel" ref="receiveServiceImpl" method="helloTransformer">
   <int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>

(3).Router

​ 路由器,用来管理一个消息应该被发送到哪个channel中。相当于JAVA里面的switch case语句吧。判断条件很多,可是使用header里面的参数具体值(比如header里面有个定义为testRouter的参数,数值为A,那么消息经过路由会发送到判断为A的通道内,后面使用中再详细讲解)

./3.jpg

(4).Service Activator

​ 我称他为服务激活器,是一个连接服务实例到消息系统的通用端点。对于服务激活器,可能是因为我理解的不够全面,我总是将他和通道适配器搞混,因为我自己测试发现,激活器和适配器都可以作为一个消息出通道的节点。

(5).Channel Adapter

​ 通道适配器是将消息通道连接到某个其他系统或传输的端点。通道适配器可以是入站或出站。通常情况下,通道适配器将在消息与从其他系统(文件,HTTP请求,JMS消息等)接收或发送的任何对象或资源之间进行映射。

(6).Channel Bridge

​ 通道桥梁,用来作为管道之间进行通信使用的,常用情景为:在一个输入管道,将管道的内容发送到另外N个管道输出,配置方式如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<!-- bridge -->
<int:channel id="bridgeSendChannel"/>
<int:bridge input-channel="bridgeSendChannel" output-channel="bridgeReceiveAChannel"/>
<int:channel id="bridgeReceiveAChannel"/>
<int:bridge input-channel="bridgeReceiveAChannel" output-channel="bridgeReceiveBChannel"/>
<int:channel id="bridgeReceiveBChannel">
    <int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="bridgeReceiveBChannel"
                              expression="@receiveServiceImpl.helloBridge(payload.name,payload.age)">
    <int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>

​ 另外还有Splitter(分解器),Aggregator(聚合器)等。对于其他的消息节点,博主还没有做过多研究,就不再次误人子弟了。后续会将未研究到的一一补上。

4.Channel Interceptor

​ 管道拦截器,能够以非常优雅,非常温柔的方式捕获管道传递之间的节点。对于拦截器,spring-integration给了我们六种节点

./4.png

​ 分别是发送前,邮寄后,发送成功后,接收前,接收后,接受成功后。可以分别在不同的节点进行操作。

四、use(demo地址在本文最后)

下面使用到的Test类为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import lombok.Data;
 
/**
 * 普通测试dto
 * @author lin
 */
@Data
public class Test {
 
    private String name;
 
    private String age;
}

(1)普通方式

​ xml配置,这里配置了一个通道helloWorldChannel,配置了个接收激活点,即接收方的地址为helloServiceImpl里面的hello方法。(其中ref指对应接收的类名,method指类里面接收的方法)

1
2
3
<!-- 测试dto模式传输 -->
<int:channel id="testChannel"/>
<int:service-activator input-channel="testChannel" ref="receiveServiceImpl" method="hello"/>

​ 发送方Service里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/**
 * 测试传输dto
 */
@Override
public void testDto() {
    System.out.println("testDto方法");
    Test test = new Test();
    test.setName("testDto");
    test.setAge("18");
    testChannel.send(MessageBuilder.withPayload(test).build());
}

​ 接收方Service里面

1
2
3
4
@Override
public void hello(Test test) {
    System.out.println(test.getName() + " " + test.getAge());
}

(2)普通多参数方式

​ xml配置,这里通过获取payload里面的具体参数来传参的形式

1
2
3
<!-- 测试多参数传递 -->
<int:channel id="moreParamChannel"/>
<int:service-activator input-channel="moreParamChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>

​ 发送方Service里面,将所有的参数通过Map形式装载到payload里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/**
 * 测试多参数传输
 */
@Override
public void moreParamm() {
    System.out.println("greetMoreParam方法");
    HashMap<String, String> map = new HashMap();
    map.put("name", "moreParam");
    map.put("age", "18");
    helloWorldMoreParamChannel.send(MessageBuilder.withPayload(map).build());
}

​ 接收方Service里面

1
2
3
4
@Override
public void helloMoreParam(String name, String age) {
    System.out.println(name + " " + age);
}

(3)JMS方式

​ xml配置,这里配置了个MQ,将消息放入mq中进行传递

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<!-- 测试Mq配置-->
<int:channel id="topicChannel"/>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
        <value>tcp://127.0.0.1:61616?trace=true&keepAlive=true</value>
    </property>
    <property name="useAsyncSend" value="true"/>
</bean>
<int-jms:outbound-channel-adapter channel="topicChannel" destination-name="topic.myTopic" pub-sub-domain="true"/>
<int:channel id="listenerChannel"/>
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="listenerChannel"destination-name="topic.myTopic" pub-sub-domain="true"/>
<int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage"/>

​ 发送方Service里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**
 * 使用mq进行传输发送方法
 */
@Override
public void send() {
    HashMap<String,Object> map = new HashMap<>();
    map.put("name","MqService");
    map.put("age","18");
    topicChannel.send(MessageBuilder.withPayload(map).build());
}

​ 接收方Service里面

1
2
3
public void processMessage(HashMap<String,Object> map) {
        System.out.println("MessageListener::::::Received message: " + map.toString());
    }

(4)订阅方式

​ xml配置,这里配置了两个订阅者,订阅者分别是两个方法

1
2
3
4
5
6
7
<!-- 测试订阅发布 -->
<!--min-subscribers=""参数为预期最小订阅者,如果必须有订阅者,则这里填写最少数;默认值为0-->
<int:publish-subscribe-channel id="pubsubChannel"/>
<int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveOne">
</int:outbound-channel-adapter>
<int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveTwo">
</int:outbound-channel-adapter>

​ 发送方Service里面

1
2
3
4
5
6
7
@Override
public void pubsubSend() {
    Test test = new Test();
    test.setName("pubsubSend");
    test.setAge("18");
    publishSubscribeChannel.send(MessageBuilder.withPayload(test).build());
}

​ 接收方Service里面

1
2
3
4
5
6
7
8
9
@Override
public void helloReceiveOne(Test test){
    System.out.println("One:"+test.getName()+" "+test.getAge());
}
 
@Override
public void helloReceiveTwo(Test test){
    System.out.println("Two:"+test.getName()+" "+test.getAge());
}

(5)router方式

​ xml配置,这里配置了一个入口通道,当消息进入入口后,通过判断header里面的’tsetHeader’参数的值,如果值为A,则进入routerAChannel通道,如果为B则进入routerBChannel通道。进入通道后分别进入两者的接收方法中。其中两种方法用了传递类,和多参数传递的形式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!-- 测试路由 -->
<!-- 路由入口 -->
<int:channel id="routingChannel">
    <int:queue/>
</int:channel>
<!-- 路由器 -->
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:poller fixed-delay="0"/>
    <int:mapping value="A" channel="routerAChannel"/>
    <int:mapping value="B" channel="routerBChannel"/>
</int:header-value-router>
<!-- 路由出口 -->
<int:channel id="routerAChannel">
    <int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="routerAChannel" ref="receiveServiceImpl" method="helloRouterTest">
    <int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>
<int:channel id="routerBChannel">
    <int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="routerBChannel"
                              expression="@receiveServiceImpl.helloRouterMap(payload.name,payload.age)">
    <int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>

​ 发送方Service里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Override
public void routerA(String name, String age) {
    Test test = new Test();
    test.setAge(age);
    test.setName(name);
    routingChannel.send(MessageBuilder.withPayload(test).setHeader("testHeader", "A").build());
}
 
@Override
public void routerB(String name, String age) {
    HashMap<String,String> map = new HashMap<>();
    map.put("name", name);
    map.put("age", age);
    routingChannel.send(MessageBuilder.withPayload(map).setHeader("testHeader", "B").build());
}

​ 接收方Service里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Override
public void helloRouterTest(Test test){
    System.out.println("routerA方法");
    System.out.println("helloRouterTest:"+test.getName()+" "+test.getAge());
}
 
@Override
public void helloRouterMap(String name,String age){
    System.out.println("routerB方法");
    System.out.println("helloRouterMap:"+name+" "+age);
}

(6)网关方式

​ xml配置,在这里面配置了一个接口类,当调用这个接口的方法时,就会进入网关配置的通道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
<!-- 网关通道口模式,dto -->
<int:channel id="getWayChannel">
    <int:queue/>
</int:channel>
<int:gateway service-interface="com.lin.integration.service.interfaces.UseGetWaySender" id="helloGetWaySender"
             default-request-channel="getWayChannel"/>
<int:outbound-channel-adapter channel="getWayChannel" ref="receiveServiceImpl" method="hello">
    <int:poller fixed-delay="0"></int:poller>
</int:outbound-channel-adapter>
 
<!-- 网关通道口模式,多参数传递 -->
<int:channel id="getWayMoreParamChannel">
    <int:queue/>
</int:channel>
<int:gateway service-interface="com.lin.integration.service.interfaces.MoreParamSender" id="getWayMoreParamSender"
             default-request-channel="getWayMoreParamChannel"/>
<int:outbound-channel-adapter channel="getWayMoreParamChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)">
<int:poller fixed-delay="0"></int:poller>
</int:outbound-channel-adapter>

​ 网关interface里面

1
2
3
public interface UseGetWaySender {
	void sendMessage(Test test);
}

1
2
3
public interface MoreParamSender {
	void sendMessage(Map map);
}

​ 发送方Service里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 测试网关dto
 */
@Override
public void getWay() {
    Test test = new Test();
    test.setAge("18");
    test.setName("getWay");
    useGetWaySender.sendMessage(test);
}
 
/**
 * 测试网关多参数
 */
@Override
public void getWayMoreParam() {
    HashMap<String, String> map = new HashMap();
    map.put("name", "getWayMoreParam");
    map.put("age", "18");
    moreParamSender.sendMessage(map);
}

(7)全局拦截器

​ 拦截器中,将需要拦截的管道进行拦截,拦截之后就会对这个管道的发送端,接收端进行拦截,拦截的接口在上文已经提到过,拦截的配置如下

1
2
3
4
5
<!-- 全局拦截器 -->
<int:channel-interceptor pattern="testInterceptorChannel" order="3" ref="countingChannelInterceptor">
</int:channel-interceptor>
<int:channel id="testInterceptorChannel"/>
<int:service-activator input-channel="testInterceptorChannel" ref="receiveServiceImpl" method="hello"/>

​ 对于近期的spring-integration研究,这些只是“初探”,如此好的一个框架模式,我也将在今后进行深入研究,会将文章进行补充,希望各位对于我文章里面的不足与错误的地方进行批评指出,从而能互相交流研究,多谢。

参考文献:

https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/

https://www.aliyun.com/jiaocheng/301276.html

https://blog.csdn.net/xiayutai1/article/details/53302652?locationNum=4&fps=1

http://www.iteye.com/topic/744524

https://blog.csdn.net/slivefox/article/details/3740541

https://my.oschina.net/zhzhenqin/blog/86586

http://www.importnew.com/16538.html

demo码云地址(10.21更新,增加了java dsl):

https://gitee.com/doubletreelin/spring-integration-mydemo.git