activemq 使用
ActiveMQ 作为apache的开源消息中间件,有广泛的使用。今天一起来学习一下它的使用。
ActiveMQ 使用java开发,支持jms,并且支持多种语言的客户端,使用kahadb作为消息存储。kahadb 是一个基于文件支持事务的消息存储器,是一个可靠的,高性能的,可扩展的消息存储器。ActiveMQ还可以使用数据库作为存储。
ActiveMQ 安装
ActiveMQ 的安装非常的简单,先根据你的系统下载对应的安装文件。界面如下图所示:
我使用的是mac系统,ActiveMQ并没有专门提供mac系统的安装包,我们直接下载linux版本的就可以。
下载完毕之后,放到你的软件目录下,使用命令 tar -xvf
进行解压。目录结构如下:
不需要进行其他额外的配置,我们就可以直接启动activeMQ。进入bin目录使用命令./activemq start
进行启动。
启动完毕之后,我们就可以在浏览器中使用地址http://localhost:8161
进行访问了。进入管理页面需要账号密码,默认都是admin
。管理界面如下:
ActiveMQ 消息类型
Queue
Queue 是点对点的形式,一个消息只能有一个消费者进行消费,不支持多个消费者。如果没有消费者进行消息,消息会进行保存,直到有消费者进行消费。
Topic
Topic 是发布订阅的形式,一个消息可以有多个消费者进行订阅,如果没有消费者进行消费,那么消息会丢失。
Queue 生产者
假设我们已经有了一个空的maven项目,我们导入需要的jar包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test-mq-queue");
MessageProducer messageProducer = session.createProducer(queue);
for (int i = 0; i < 10; i++) { messageProducer.send(session.createTextMessage("hello,I am producer " + i)); }
connection.close();
|
failover 是activvmq的容错协议,你可以配置多个连接,比如:failover://(tcp://localhost:61616,tcp://localhost:61615)当一个不可用时,会自动的切换到另外一个。
我们刚才生产了10个消息,我们可以看下管理后台,可以看到我们消息已经正确的发送到了服务端。
Queue 消费者
需要的jar包同生产者,不重复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test-mq-queue");
MessageConsumer messageConsumer = session.createConsumer(queue);
Message message = messageConsumer.receive();
System.out.println(((TextMessage)message).getText());
messageConsumer.setMessageListener(new MQMessageConsumer());
class MQMessageConsumer implements MessageListener{ public void onMessage(Message message){ } }
|
我们需要注意的是,如果我们使用receive方法来接收消息,那么我们是不可以再去设置一个监听器的,否则会发生异常
Topic 消费者
Topic 模式下,如果没有消费者,消息会丢失,所以,我们先创建一个Topic消费者。
我们看到创建Session的代码都是一致的,所以我们把创建session的代码抽象出来,作为一个工具类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class MQUtil {
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private Session session;
private Connection connection;
private String url;
public MQUtil(String url) { this.url = url; }
public Session getSession() throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); return session; }
public void close() throws JMSException { if (connection != null) { connection.close(); } } }
|
创建一个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class MQTopicConsumer implements MessageListener {
private static final String TOPIC_NAME = "test-topic-name";
public static void main(String[] args) throws JMSException {
MQUtil mqUtil = new MQUtil(MQUtil.BROKER_URL);
Session session = mqUtil.getSession();
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener(new MQTopicConsumer()); }
public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } else { System.err.println("类型错误" + message.getClass()); } } }
|
可以看到,代码跟Queue大部分都是一样的,而且还非常的简单。
Topic 生产者
我们这里直接上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MQTopicProducer {
private static final String TOPIC_NAME = "test-topic-name";
public static void main(String[] args) throws JMSException {
MQUtil mqUtil = new MQUtil(MQUtil.BROKER_URL);
Session session = mqUtil.getSession();
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 10; i++) { producer.send(session.createTextMessage(" I am topic " + i)); }
mqUtil.close(); } }
|
我们先启动消费者,再启动生产者。控制台输入如下:
我们知道Topic是可以进行多消费的,我们对消费者代码进行下更改,创建两个消费者进行消费,看看控台的打印是什么样的。
代码修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class MQTopicConsumer extends Thread implements MessageListener {
private static final String TOPIC_NAME = "test-topic-name"; public MQTopicConsumer(String name) { super(name); }
public static void main(String[] args) { new MQTopicConsumer("A").start(); new MQTopicConsumer("B").start(); }
@Override public void run() { try { MQUtil mqUtil = new MQUtil(MQUtil.BROKER_URL);
Session session = mqUtil.getSession();
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener(this); } catch (JMSException e) { e.printStackTrace(); } }
public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(this.getName() + "-" + text); } catch (JMSException e) { e.printStackTrace(); } } else { System.err.println("类型错误" + message.getClass()); } } }
|
现在我们启动消费者,并启动生产者,我们修改消息的数量为5个。控制台如下:
可以看到,每一个消息都消费了两次。
ActiveMQ 集群
我们现在把ActiveMQ的安装包直接复制一份,修改cnf文件夹下面的activemq.xml和jetty.xml中的端口号
activemq.xml 文件修改如下:
jetty.xml 文件修改如下:
修改完毕之后,这个新的activeMQ就可以直接启动了,但是他还是一个独立的,跟我们之前启动的并没有关联,这个时候,我们需要在activemq的broker节点中添加如下的配置:
1 2 3
| <networkConnectors> <networkConnector uri="static:(tcp://127.0.0.1:61616)" duplex="true"/> </networkConnectors>
|
duplex 表示开启双工。
这里我们给这两个ActiveMQ做一个编号,第一个为A,第二个为B
这个时候,如果一个新的消费者连接上了B,一个生产者往A推送了新的消息,那么这个时候B上的消费者就可以消费到A上消息
有这样的一个场景:
生产者往A上推送了10个消息,消费者在B上消费了一个消息,那么这个时候,ActiveMQ会把A上的10个消息复制到B上。这个时候B突然停机了,那么剩余的9个消息就不能进行消费了。
网上查询说是设置一个消息回流,就是在policyEntries的节点下添加一个新的policyEntrie,如下:
1 2 3 4 5
| <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers = "true" /> </networkBridgeFilterFactory> </policyEntry>
|
就可以解决,但是我试了,并不好使,不知道是哪里问题。
问题解决:
消息回流是在没有消费者的情况下,就是B上没有消费者,A上有了新的消费者的时候,消费会再次流回A。需要注意的是消息回流的配置需要两个ActiveMQ都进行配置