ActiveMQ详细入门使用教程

Posted by linhuanjie on 2019-05-19

作者: JHON_YUAN

原文:ActiveMQ详细入门使用教程

ActiveMQ介绍

​ MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQRabbitMQkafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX

消息形式:
1、点对点(queue)
2、一对多(topic)

ActiveMQ安装

然后点击queues可以看到现在没有一条消息:

ActiveMQ测试

编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:

1
2
3
4
<dependency>  
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
1
compile group: 'org.apache.activemq', name: 'activemq-all', version: '5.11.1'

测试点对点(queue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.linhuanjie.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author: linhuanjie
* @description: 点对点(queue)
* @createTime : 2019-05-19 13:17
* @email: lhuanjie@qq.com
*/
public class TestQueueActiveMQ {

public static void main(String[] args) {
try {
testMQProducerQueue();
TestMQConsumerQueue();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* ActiveMQ 生产者
* @throws Exception
*/
public static void testMQProducerQueue() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(queue);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、发送消息
producer.send(textMessage);
System.out.println("send_queue:"+textMessage.getText());
//9、关闭资源
producer.close();
session.close();
connection.close();
}

/**
* ActiveMQ 消费者
* @throws Exception
*/
public static void TestMQConsumerQueue() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener( message -> {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("receive_queue:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8、程序等待接收用户消息
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
}

MQ管理后台: http://xxx_your_ip:8186/admin

用户名:admin

密码:admin

控制台输出:

一对多(topic)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.linhuanjie.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author: linhuanjie
* @description: 一对多(topic)
* tips: 消费者先订阅,发布者再发布
* @createTime : 2019-05-19 13:17
* @email: lhuanjie@qq.com
*/
public class TestTopicActiveMQ implements Runnable {

public static void main(String[] args) {
try {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i=0;i<5;i++) {
executorService.execute(new TestTopicActiveMQ());
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* ActiveMQ 发布者
*
* @throws Exception
*/
public void testMQProducerTopic() throws Exception {
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Topic topic = session.createTopic("test_topic");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(topic);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-topic");
//8、发送消息
producer.send(textMessage);
System.out.println("send_topic:" + textMessage.getText());
//9、关闭资源
producer.close();
session.close();
connection.close();
}

/**
* ActiveMQ 消费者
*
* @throws Exception
*/
public void testMQConsumerTopic() throws Exception {
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Topic topic = session.createTopic("test_topic");
//6、使用会话对象创建生产者对象
MessageConsumer consumer = session.createConsumer(topic);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(message -> {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("receive_topic:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8、程序等待接收用户消息
System.out.println("开始订阅...");
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}

@Override
public void run() {
Random random = new Random();
try {
int num = random.nextInt(10);
if (num % 2 == 1) {
this.testMQProducerTopic();
} else {
this.testMQConsumerTopic();
}
} catch (Exception e) {
e.printStackTrace();
}

}
}

这里使用了线程池随机数,主要是为了实现一个类进行发布和订阅。

MQ管理后台

后台输出: