spring集成JMS(ActiveMQ)
一、下载与安装ActiveMQ
http://activemq.apache.org/download.html
我下载的是最新的5.12.0版,解压后安装目录下运行bin/activemq start
二、配置POM文件
看包括springside在内的很多demo的时候看到都是配置的active-core的依赖,而官网5.12.0下面推荐的配置是activemq-all,没太弄明白,后来突然发现页面有一行小字:
If you need more fine grained control of your dependencies (activemq-all is an uber jar) pick and choose from the various components activemq-client, activemq-broker, activemq-xx-store etc.
英文不好惹的祸啊,如果是中文早就看到了…往前几个版本的文档看了下,从5.8就开始用activemq-client + activemq-broker替代activemq-core了,大概看了一下active-all,把几个不同版本的active-xx-store包含进去了。
最后我的配置:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45<activemq.version>5.12.0</activemq.version>
......
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!-- 我配置了slf4j和logback,所以把log4j的依赖exclusion -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${activemq.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jdbc-store</artifactId>
<version>${activemq.version}</version>
</dependency>
......
接下来要实现Producer和Consumer
首先是Producer,可以实现MessageProducer接口,但接口中有很多方法不需要实现,所以没有继承该接口:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package com.climbran.jms;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
/**
* JMS消息生产者.
* @author swe
*/
public class Producer {
private JmsTemplate jmsTemplate;
private Destination notifyQueue;
private Destination notifyTopic;
public void sendQueue(final String message) {
sendMessage(message, notifyQueue);
}
public void sendTopic(final String message) {
sendMessage(message, notifyTopic);
}
private void sendMessage(String message, Destination destination) {
jmsTemplate.convertAndSend(destination, message);
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setNotifyQueue(Destination notifyQueue) {
this.notifyQueue = notifyQueue;
}
public void setNotifyTopic(Destination nodifyTopic) {
this.notifyTopic = nodifyTopic;
}
}
接下来实现消费者(Consumer),Spring通过DefaultMessageListenerContainer内部初始化一个taskExecutor来执行监听的任务,这里实现监听器就好了:
1 | package com.climbran.jms; |
接下来就是配置文件,配置了1个生产者,3个消费者,其中一个监听queue,两个监听topic:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<description>JMS配置</description>
<!-- ActiveMQ 连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- Spring Caching 连接工厂 -->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory" />
<property name="sessionCacheSize" value="10" />
</bean>
<!-- Queue定义 -->
<bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="q.notify" />
</bean>
<!-- Topic定义 -->
<bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="t.notify" />
</bean>
<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
</bean>
<!-- 使用Spring JmsTemplate的消息生产者 -->
<bean id="notifyMessageProducer" class="com.climbran.jms.Producer">
<property name="jmsTemplate" ref="jmsTemplate" />
<property name="notifyQueue" ref="notifyQueue" />
<property name="notifyTopic" ref="notifyTopic" />
</bean>
<!-- 异步接收Queue消息Container,DefaultMessageListenerContainer内部初始化建立的一个taskExecutor执行监听的任务-->
<!-- 相当于配置Consumer,并为Consumer设置了Listenser,同时为Consumer设置了destination-->
<bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="notifyQueue" />
<property name="messageListener" ref="notifyMessageListener" />
<property name="concurrentConsumers" value="10" />
</bean>
<!-- 异步接收Topic消息Container -->
<bean id="topicContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="notifyTopic" />
<property name="messageListener" ref="notifyMessageListener" />
</bean>
<bean id="topicContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="notifyTopic" />
<property name="messageListener" ref="notifyMessageListener" />
</bean>
<!-- 异步接收消息处理类 -->
<bean id="notifyMessageListener" class="com.climbran.jms.MyMessageListener" />
</beans>
最后业务层调用:1
2producer.sendTopic("a topic message");
producer.sendQueue("a queue message");
结果打印出2条”a topic message”,1条”a queue message”