activemq的broker怎么实现
这篇文章主要介绍“activemq的broker怎么实现”,在日常操作中,相信很多人在activemq的broker怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”activemq的broker怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
broker:
broker 相当于 activemq服务的一个实例,他可以嵌入在Java程序里面。也就是说,Linux端的activemq可以不用启动,启动Java程序端的broker,消费者和生产者访问当前的broker即可,即brokerUrl ip改为localhost即可。开发使用还是常用Linux端的activemq服务。
broker代码实现:
这里使用的while(true)的死循环,保持broker一直启动;
1 public class TestActiveMqBroker {
2 public static void main(String[] args) throws Exception {
3 // ActiveMQ也支持在vm中通信基于嵌入的broker
4 BrokerService brokerService = new BrokerService();
5 brokerService.setPopulateJMSXUserID(true);
6 brokerService.addConnector("tcp://localhost:61616");
7 brokerService.start();
8 while (true) {
9
10 }
11 }
12
消费者&生产者代码示例:
地址需要改成localhost;
ACTIVEMQ_URL = "tcp://localhost:61616";
消费者:
1 package org.muses.ssm.utils;
2
3 import java.io.IOException;
4
5 import javax.jms.Connection;
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MessageConsumer;
9 import javax.jms.MessageListener;
10 import javax.jms.Queue;
11 import javax.jms.Session;
12 import javax.jms.TextMessage;
13
14 import org.apache.activemq.ActiveMQConnectionFactory;
15
16 public class TestActiveMqConsumer {
17 private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
18 private static final String QUEUE_NAME = "queue_01";
19
20 public static void main(String[] args) throws JMSException, IOException {
21 // 创建连接工厂,按照给定的URL,采用默认用户名密码
22 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
23 // 通过连接工厂 获取connection 并启动访问
24 Connection conn = activeMQConnectionFactory.createConnection();
25 conn.start();
26 // 创建session会话
27 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
28 // 创建目的地 (具体是队列还是主题topic)
29 Queue queue = session.createQueue(QUEUE_NAME);
30
31 // 创建消息的生产者
32 MessageConsumer messageConsumer = session.createConsumer(queue);
33
34 System.out.println("****我是3号消费者******");
35 messageConsumer.setMessageListener(new MessageListener() {
36
37 @Override
38 public void onMessage(Message message) {
39 if (message != null && message instanceof TextMessage) {
40 TextMessage textMessage = (TextMessage) message;
41 try {
42 System.out.println("收到消息:" + textMessage.getText());
43 } catch (JMSException e) {
44 // TODO Auto-generated catch block
45 e.printStackTrace();
46 }
47 }
48 }
49
50 });
51 System.in.read();
52
53 messageConsumer.close();
54 session.close();
55 conn.close();
56
57 }
58
59
View Code
生产者:
1 package org.muses.ssm.utils;
2
3 import javax.jms.Connection;
4 import javax.jms.JMSException;
5 import javax.jms.MessageProducer;
6 import javax.jms.Queue;
7 import javax.jms.Session;
8 import javax.jms.TextMessage;
9
10 import org.apache.activemq.ActiveMQConnectionFactory;
11
12 public class TestActiveMqProducer {
13 private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
14 private static final String QUEUE_NAME = "queue_01";
15
16 public static void main(String[] args) throws JMSException {
17 // 创建连接工厂,按照给定的URL,采用默认用户名密码
18 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
19 // 通过连接工厂 获取connection 并启动访问
20 Connection conn = activeMQConnectionFactory.createConnection();
21 conn.start();
22 // 创建session会话
23 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
24 // 创建目的地 (具体是队列还是主题topic)
25 Queue queue = session.createQueue(QUEUE_NAME);
26
27 // 创建消息的生产者
28 MessageProducer messageProducer = session.createProducer(queue);
29
30 for (int i = 0; i < 3; i++) {
31 // 创建消息;可以理解为学生按照要求写好问题
32 TextMessage textMessage = session.createTextMessage("mession-------" + i);
33 // 通过messageProducer 发送给mq
34 messageProducer.send(textMessage);
35 }
36 messageProducer.close();
37 session.close();
38 conn.close();
39 System.out.println("发送消息成功");
40 }
41
42
View Code
生产者发送3条消息,消费者消费3条消息,和启动Linux的activemq 服务效果相同;
引入的jar包:
1 <dependency>
2 <groupId>org.apache.activemq</groupId>
3 <artifactId>activemq-all</artifactId>
4 <version>5.16.5</version>
5 </dependency>
6 <dependency>
7 <groupId>org.apache.xbean</groupId>
8 <artifactId>xbean-spring</artifactId>
9 <version>4.21</version>
10 </dependency>
11 <!-- activemq broker需要的依赖 -->
12 <dependency>
13 <groupId>com.fasterxml.jackson.core</groupId>
14 <artifactId>jackson-databind</artifactId>
15 <version>2.13.3</version>
16 </dependency>
相关文章