activemq的broker怎么实现

2023-06-19 09:28:44 broker activemq

这篇文章主要介绍“activemq的broker怎么实现”,在日常操作中,相信很多人在activemq的broker怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”activemq的broker怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

broker:

 broker 相当于 activemq服务的一个实例,他可以嵌入在Java程序里面。也就是说,Linux端的activemq可以不用启动,启动Java程序端的broker,消费者和生产者访问当前的broker即可,即brokerUrl ip改为localhost即可。开发使用还是常用Linux端的activemq服务。

broker代码实现:

这里使用的while(true)的死循环,保持broker一直启动;

1 public class TestActiveMqBroker {
 2     public static void main(String[] args) throws Exception {
 3         // ActiveMQ也支持在vm中通信基于嵌入的broker
 4         BrokerService brokerService = new BrokerService();
 5         brokerService.setPopulateJMSXUserID(true);
 6         brokerService.addConnector("tcp://localhost:61616");
 7         brokerService.start();
 8         while (true) {
 9 
10         }
11     }
12

消费者&生产者代码示例:

地址需要改成localhost;

ACTIVEMQ_URL = "tcp://localhost:61616";

消费者:

1 package org.muses.ssm.utils;
 2 
 3 import java.io.IOException;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageConsumer;
 9 import javax.jms.MessageListener;
10 import javax.jms.Queue;
11 import javax.jms.Session;
12 import javax.jms.TextMessage;
13 
14 import org.apache.activemq.ActiveMQConnectionFactory;
15 
16 public class TestActiveMqConsumer {
17     private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
18     private static final String QUEUE_NAME = "queue_01";
19 
20     public static void main(String[] args) throws JMSException, IOException {
21         // 创建连接工厂,按照给定的URL,采用默认用户名密码
22         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
23         // 通过连接工厂 获取connection 并启动访问
24         Connection conn = activeMQConnectionFactory.createConnection();
25         conn.start();
26         // 创建session会话
27         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
28         // 创建目的地 (具体是队列还是主题topic)
29         Queue queue = session.createQueue(QUEUE_NAME);
30 
31         // 创建消息的生产者
32         MessageConsumer messageConsumer = session.createConsumer(queue);
33 
34         System.out.println("****我是3号消费者******");
35         messageConsumer.setMessageListener(new MessageListener() {
36 
37             @Override
38             public void onMessage(Message message) {
39                 if (message != null && message instanceof TextMessage) {
40                     TextMessage textMessage = (TextMessage) message;
41                     try {
42                         System.out.println("收到消息:" + textMessage.getText());
43                     } catch (JMSException e) {
44                         // TODO Auto-generated catch block
45                         e.printStackTrace();
46                     }
47                 }
48             }
49 
50         });
51         System.in.read();
52 
53         messageConsumer.close();
54         session.close();
55         conn.close();
56 
57     }
58 
59

View Code

生产者:

activemq的broker怎么实现

activemq的broker怎么实现

1 package org.muses.ssm.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.JMSException;
 5 import javax.jms.MessageProducer;
 6 import javax.jms.Queue;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class TestActiveMqProducer {
13     private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
14     private static final String QUEUE_NAME = "queue_01";
15 
16     public static void main(String[] args) throws JMSException {
17         // 创建连接工厂,按照给定的URL,采用默认用户名密码
18         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
19         // 通过连接工厂 获取connection 并启动访问
20         Connection conn = activeMQConnectionFactory.createConnection();
21         conn.start();
22         // 创建session会话
23         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
24         // 创建目的地 (具体是队列还是主题topic)
25         Queue queue = session.createQueue(QUEUE_NAME);
26 
27         // 创建消息的生产者
28         MessageProducer messageProducer = session.createProducer(queue);
29 
30         for (int i = 0; i < 3; i++) {
31             // 创建消息;可以理解为学生按照要求写好问题
32             TextMessage textMessage = session.createTextMessage("mession-------" + i);
33             // 通过messageProducer 发送给mq
34             messageProducer.send(textMessage);
35         }
36         messageProducer.close();
37         session.close();
38         conn.close();
39         System.out.println("发送消息成功");
40     }
41 
42

View Code

生产者发送3条消息,消费者消费3条消息,和启动Linux的activemq 服务效果相同;

引入的jar包:

1     <dependency>
 2             <groupId>org.apache.activemq</groupId>
 3             <artifactId>activemq-all</artifactId>
 4             <version>5.16.5</version>
 5         </dependency>
 6         <dependency>
 7             <groupId>org.apache.xbean</groupId>
 8             <artifactId>xbean-spring</artifactId>
 9             <version>4.21</version>
10         </dependency>
11         <!-- activemq broker需要的依赖 -->
12          <dependency>
13           <groupId>com.fasterxml.jackson.core</groupId>
14           <artifactId>jackson-databind</artifactId>
15           <version>2.13.3</version>
16         </dependency>

相关文章