activemq 使用

ActiveMQ 作为apache的开源消息中间件,有广泛的使用。今天一起来学习一下它的使用。

ActiveMQ 使用java开发,支持jms,并且支持多种语言的客户端,使用kahadb作为消息存储。kahadb 是一个基于文件支持事务的消息存储器,是一个可靠的,高性能的,可扩展的消息存储器。ActiveMQ还可以使用数据库作为存储。

ActiveMQ 安装

ActiveMQ 的安装非常的简单,先根据你的系统下载对应的安装文件。界面如下图所示:

下载界面

我使用的是mac系统,ActiveMQ并没有专门提供mac系统的安装包,我们直接下载linux版本的就可以。

下载完毕之后,放到你的软件目录下,使用命令 tar -xvf 进行解压。目录结构如下:

activemq目录结构

不需要进行其他额外的配置,我们就可以直接启动activeMQ。进入bin目录使用命令./activemq start进行启动。

启动完毕之后,我们就可以在浏览器中使用地址http://localhost:8161进行访问了。进入管理页面需要账号密码,默认都是admin。管理界面如下:

activemq管理界面

ActiveMQ 消息类型

Queue

Queue 是点对点的形式,一个消息只能有一个消费者进行消费,不支持多个消费者。如果没有消费者进行消息,消息会进行保存,直到有消费者进行消费。

Topic

Topic 是发布订阅的形式,一个消息可以有多个消费者进行订阅,如果没有消费者进行消费,那么消息会丢失。

Queue 生产者

假设我们已经有了一个空的maven项目,我们导入需要的jar包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 默认连接是。failover://tcp://localhost:61616 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Queue 名字是test-mq-queue
Queue queue = session.createQueue("test-mq-queue");
//创建生产者
MessageProducer messageProducer = session.createProducer(queue);
// 发送10个消息
for (int i = 0; i < 10; i++) {
messageProducer.send(session.createTextMessage("hello,I am producer " + i));
}
//关闭连接
connection.close();

failover 是activvmq的容错协议,你可以配置多个连接,比如:failover://(tcp://localhost:61616,tcp://localhost:61615)当一个不可用时,会自动的切换到另外一个。

我们刚才生产了10个消息,我们可以看下管理后台,可以看到我们消息已经正确的发送到了服务端。

queue 消息

Queue 消费者

需要的jar包同生产者,不重复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 默认连接是。failover://tcp://localhost:61616 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Queue 名字是test-mq-queue
Queue queue = session.createQueue("test-mq-queue");
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//1、
// 这个方法会阻塞直到返回一个消息,或者我们也可以穿入一个超时参数。
Message message = messageConsumer.receive();
//因为我们知道我们穿入的是一个文本消息,所以么一直接强转,并打印
System.out.println(((TextMessage)message).getText());

//2、我们可以实现MessageListener接口,并重写onMessage方法,这样再有消息传入的时候就会直接调用该方法
// 需要在MessageConsumer上设置我们的监听器

messageConsumer.setMessageListener(new MQMessageConsumer());

class MQMessageConsumer implements MessageListener{
public void onMessage(Message message){
// do something ...
}
}

我们需要注意的是,如果我们使用receive方法来接收消息,那么我们是不可以再去设置一个监听器的,否则会发生异常

Topic 消费者

Topic 模式下,如果没有消费者,消息会丢失,所以,我们先创建一个Topic消费者。

我们看到创建Session的代码都是一致的,所以我们把创建session的代码抽象出来,作为一个工具类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MQUtil {

public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

private Session session;

private Connection connection;

private String url;

public MQUtil(String url) {
this.url = url;
}

public Session getSession() throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session;
}

public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
}

创建一个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MQTopicConsumer implements MessageListener {

private static final String TOPIC_NAME = "test-topic-name";

public static void main(String[] args) throws JMSException {

MQUtil mqUtil = new MQUtil(MQUtil.BROKER_URL);

Session session = mqUtil.getSession();

Topic topic = session.createTopic(TOPIC_NAME);

MessageConsumer messageConsumer = session.createConsumer(topic);

messageConsumer.setMessageListener(new MQTopicConsumer());
}

public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.err.println("类型错误" + message.getClass());
}
}
}

可以看到,代码跟Queue大部分都是一样的,而且还非常的简单。

Topic 生产者

我们这里直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MQTopicProducer {

private static final String TOPIC_NAME = "test-topic-name";

public static void main(String[] args) throws JMSException {

MQUtil mqUtil = new MQUtil(MQUtil.BROKER_URL);

Session session = mqUtil.getSession();

Topic topic = session.createTopic(TOPIC_NAME);

MessageProducer producer = session.createProducer(topic);

for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage(" I am topic " + i));
}

mqUtil.close();
}
}

我们先启动消费者,再启动生产者。控制台输入如下:

topic控制台打印

我们知道Topic是可以进行多消费的,我们对消费者代码进行下更改,创建两个消费者进行消费,看看控台的打印是什么样的。

代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class MQTopicConsumer extends Thread implements MessageListener {

private static final String TOPIC_NAME = "test-topic-name";
//用来设置线程的名字
public MQTopicConsumer(String name) {
super(name);
}

public static void main(String[] args) {
//创建两个线程,并分别修改线程的名字
new MQTopicConsumer("A").start();
new MQTopicConsumer("B").start();
}

@Override
public void run() {
try {
MQUtil mqUtil = new MQUtil(MQUtil.BROKER_URL);

Session session = mqUtil.getSession();

Topic topic = session.createTopic(TOPIC_NAME);

MessageConsumer messageConsumer = session.createConsumer(topic);

messageConsumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
}
}

public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印当前线程的名字
System.out.println(this.getName() + "-" + text);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.err.println("类型错误" + message.getClass());
}
}
}

现在我们启动消费者,并启动生产者,我们修改消息的数量为5个。控制台如下:

topic控制台打印

可以看到,每一个消息都消费了两次。

ActiveMQ 集群

我们现在把ActiveMQ的安装包直接复制一份,修改cnf文件夹下面的activemq.xml和jetty.xml中的端口号

activemq.xml 文件修改如下:

activeMq 修改端口号

jetty.xml 文件修改如下:

activeMq 修改端口号

修改完毕之后,这个新的activeMQ就可以直接启动了,但是他还是一个独立的,跟我们之前启动的并没有关联,这个时候,我们需要在activemq的broker节点中添加如下的配置:

1
2
3
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61616)" duplex="true"/>
</networkConnectors>

duplex 表示开启双工。

这里我们给这两个ActiveMQ做一个编号,第一个为A,第二个为B

这个时候,如果一个新的消费者连接上了B,一个生产者往A推送了新的消息,那么这个时候B上的消费者就可以消费到A上消息

有这样的一个场景:

生产者往A上推送了10个消息,消费者在B上消费了一个消息,那么这个时候,ActiveMQ会把A上的10个消息复制到B上。这个时候B突然停机了,那么剩余的9个消息就不能进行消费了。

网上查询说是设置一个消息回流,就是在policyEntries的节点下添加一个新的policyEntrie,如下:

1
2
3
4
5
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers = "true" />
</networkBridgeFilterFactory>
</policyEntry>

就可以解决,但是我试了,并不好使,不知道是哪里问题。

问题解决:
消息回流是在没有消费者的情况下,就是B上没有消费者,A上有了新的消费者的时候,消费会再次流回A。需要注意的是消息回流的配置需要两个ActiveMQ都进行配置