ActiveMQ学习总结------实战操作(上)02

2019-10-12 00:00:00 activemq 学习

相信大家通过上一篇博文已经对ActiveMQ有了一个大致的概念了,

那么本篇博文将带领大家一步一步去实战操作我们的ActiveMQ

本篇主要内容:

  1.ActiveMQ术语及API介绍

  2.ActiveMQ 文本消息处理

  3.ActiveMQ 对象消息处理

既然我们要学习如何实战操作了,那么久不可不知它的一些术语和API的用于

即使觉得枯燥,我们也要大致看一眼,过后相信你还会再来看的!因为很有用

 

 

一 ActiveMQ 的术语(这么碉堡的东西怎么会没有写让人觉得更碉堡的词汇呢?)

1 Destination

目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。
MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination
才能接收消息。

 

2 Producer

消息生成者,负责发送 Message 到目的地。

 

3 Consumer | Receiver

消息消费者,负责从目的地中消费【处理|监听|订阅】Message。

 

4 Message

消息,消息封装一次通信的内容。

 

 

 

二 ActiveMQ API简介

下述API都是接口类型,由定义在javax.jms包中

都是JMS(Java Message Service)标准接口定义

 

1.ConnectionFactory

  链接工厂,用于创建链接的工厂类型

2.Connection

  链接,用于建立访问ActiveMQ连接的类型,由链接工厂ConnectionFactory创建

3.Session

  会话,一次持久有效有状态的访问,由链接创建

4.Destination & Queue

  目的地。用于描述本次访问ActiveMQ的消息访问目的地,即ActiveMQ服务中的具体队列,由会话创建

  interface Queue extends Destination

5.MessageProducer

  消息生成者、在一次有效会话中,用于发送消息给ActiveMQ服务的工具,由会话创建

6.MessageConsumer

  消息消费者【消息订阅者|消息处理着】,在一次有效会话中,用于从ActiveMQ服务中获取消息的工具,由会话创建

7.Message

  消息。通过消息生成者向ActiveMQ服务发送消息时使用的数据载体对象或消费者从ActiveMQ服务中获取消息时使用的数据载体对象。是所有消息【文本消息|对象消息等】具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取

 

 

 

 

三 ActiveMQ处理文本消息

:*本案例将以producer(消息生产者)和consumer(消息发送者)连个模块为例

 

准备工作:

  确保你的ActiveMQ已经开启,并且所依赖的端口已经关闭(建议直接关闭防火墙 service iptables stop)

  Maven环境

 

1 消息生产者

1.1创建模块

《ActiveMQ学习总结------实战操作(上)02》

 

 

1,2在POM文件添加ActiveMQ的依赖(根据个人的ActiveMQ版本自行查找)

 <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

 

1.3 编写消息的生产者

package cn.arebirth.mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQTextProducer {
    
    public void sendTextActiveMQ(String msg) {
        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //定义消息的发送者
        MessageProducer producer = null;

        //定义消息
        Message message = null;


        try {
            /**
             * userName:访问ActiveMQ服务的用户名,默认为admin。
             * password:访问ActiveMQ服务的密码,默认为admin
             * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改
             *
             * borkerURL:访问ActiveMQ服务的路径地址。
             *  路径结构为:协议名://主机地址:端口号
             *  在conf/activemq.xml文件中可以找到修改
             *  在上一篇文章中都有介绍
             */
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            /**
             * 参数一:transacted 是否使用事务 可选值为:true|false
             *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理
             *      false:不使用事务,则设置我们的参数即可
             *
             * acknowledgeMode:
             * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             这里设置 这两个参数的含义为:
             不使用事务,并由Session自动确认提交
             * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理
             *
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列
            //名称不固定
            destination = session.createQueue("hello-mq");

            //创建消息的生产者 需要指定目的地(也就是把Destination传入参数)
            producer = session.createProducer(destination);

            //创建消息对象 并传入我们需要放入队列的消息
            message = session.createTextMessage(msg);

            //发送消息
            producer.send(message);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

 

 

2. 消息消费者

2.1创建模块

《ActiveMQ学习总结------实战操作(上)02》

 

 

 

2,2在POM文件添加ActiveMQ的依赖(根据个人的ActiveMQ版本自行查找)

 <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

 

2.3编写消息的消费者

package cn.arebirth.mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQTextConsumer {

    public void receiveTextActiveMQ() {
        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //定义消息的消费者(接收者)
        MessageConsumer consumer = null;

        //定义消息
        Message message = null;


        try {
            /**
             * userName:访问ActiveMQ服务的用户名,默认为admin。
             * password:访问ActiveMQ服务的密码,默认为admin
             * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改
             *
             * borkerURL:访问ActiveMQ服务的路径地址。
             *  路径结构为:协议名://主机地址:端口号
             *  在conf/activemq.xml文件中可以找到修改
             *  在上一篇文章中都有介绍
             */
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            /**
             * 参数一:transacted 是否使用事务 可选值为:true|false
             *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理
             *      false:不使用事务,则设置我们的参数即可
             *
             * acknowledgeMode:
             * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             这里设置 这两个参数的含义为:
             不使用事务,并由Session自动确认提交
             * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理
             *
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列
            //名称不固定
            destination = session.createQueue("hello-mq");

            //创建消息的消费者 需要指定目的地(也就是把Destination传入参数)
            consumer = session.createConsumer(destination);

            //创建消息对象 由消费者接收消息
            message = consumer.receive();

            //处理消息
            String msg = ((TextMessage) message).getText();
            System.out.println("ActiveMQ say:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //回收消息接收者资源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

 

 

3 测试成果

定义测试类,来测试我们的成果!

 

3.1 定义producer测试类

package cn.arebirth.mq;

public class ProducerTest {
    public static void main(String[] args) {
        ActiveMQTextProducer producer = new ActiveMQTextProducer();
        producer.sendTextActiveMQ("Hello,ActiveMQ!");
    }
}

当我们执行完这段代码后,我们打开我们的ActiveMQ的控制面板(首先你要启动你的ActiveMQ,上一篇有介绍到!)

《ActiveMQ学习总结------实战操作(上)02》

 

 

接下来,当我们看到此图片内容即说明producer成果运行!(hello-mq我们自定义的队列名称!)

《ActiveMQ学习总结------实战操作(上)02》

 

 

 

 

3.2 定义 Consumer测试类

package cn.arebirth.mq;

public class ConsumerTest {
    public static void main(String[] args) {
        ActiveMQTextConsumer consumer = new ActiveMQTextConsumer();
        consumer.receiveTextActiveMQ();
    }
}

 

运行结果出现如下内容即成果!

《ActiveMQ学习总结------实战操作(上)02》

 

 《ActiveMQ学习总结------实战操作(上)02》

 

 

 

当我们仔细观察ActiveMQ控制台的时候会发现,

  producer每产生一条消息对应的Number Of Pending Message和Message Enqueued就+1

  consumer每次启动消费消息的时候,Number Of pending Message就会-1  Message Dequeued +1 

为什么呢?哈哈  相信大家已经大概猜出来了或者看英文的意思  

介绍下:

  • Name:消息的名字
  • Number Of Pending Message:为消费的消息数量
  • Number Of Consumers:消费者的数量
  • Message Enqueued:队列共有过多少消息
  • Message Dequeued:消费者消费了多少条消息

 

鼓捣完文本消息,那么有的人会说,只能鼓捣文本??错了,接下来我们搞实用的对象!(不是你和我,是我们和电脑 -. -)

 

 

三 ActiveMQ处理对象消息

有了上面处理文本消息的基础,我们将很容易掌握处理对象消息的能力!这里面的步骤,我们将略省一部分,和上面及其相似

 

1 对象创建(处理对象消息,那么一定是要有个数据载体的对象啦,我们自己创建一个即可     生产者和消费者均需要此对象!)

package cn.arebirth.pojo;

import java.io.Serializable;

/**
 * 一定要序列化!!
 */
public class User implements Serializable {
    private String username;
    private String pwd;
    private String content;

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", pwd='" + pwd + '\'' +
                ", content='" + content + '\'' +
                '}';
    }

    public User() {
    }

    public User(String username, String pwd, String content) {
        this.username = username;
        this.pwd = pwd;
        this.content = content;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPwd() {
        return pwd;
    }

    public void setPwd(String pwd) {
        this.pwd = pwd;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

 

2 消息生产者

2.1 还是需要pom的jar包依赖的的,如果还在原来的环境上,则不需要添加啦!

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

 

2.2 编写消息的生产者

package cn.arebirth.mq;

import cn.arebirth.pojo.User;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQObjectProducer {

    public void sendObjectActiveMQ(User user){
        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //定义消息的发送者
        MessageProducer producer = null;

        //定义消息
        Message message = null;


        try {
            /**
             * userName:访问ActiveMQ服务的用户名,默认为admin。
             * password:访问ActiveMQ服务的密码,默认为admin
             * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改
             *
             * borkerURL:访问ActiveMQ服务的路径地址。
             *  路径结构为:协议名://主机地址:端口号
             *  在conf/activemq.xml文件中可以找到修改
             *  在上一篇文章中都有介绍
             */
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            /**
             * 参数一:transacted 是否使用事务 可选值为:true|false
             *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理
             *      false:不使用事务,则设置我们的参数即可
             *
             * acknowledgeMode:
             * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             这里设置 这两个参数的含义为:
             不使用事务,并由Session自动确认提交
             * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理
             *
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列
            //名称不固定
            destination = session.createQueue("hello-mq-obj");

            //创建消息的生产者 需要指定目的地(也就是把Destination传入参数)
            producer = session.createProducer(destination);

            //创建消息对象 并传入我们需要放入队列的消息
            message = session.createObjectMessage(user);

            //发送消息
            producer.send(message);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

 

3 消息消费者

3.1 还是需要pom的jar包依赖的的,如果还在原来的环境上,则不需要添加啦!

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

 

3.2  编写消息的消费者

package cn.arebirth.mq;

import cn.arebirth.pojo.User;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQObjectConsumer {
    public void receiveObjectActiveMQ() {
        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //定义消息的消费者(接收者)
        MessageConsumer consumer = null;

        //定义消息
        Message message = null;


        try {
            /**
             * userName:访问ActiveMQ服务的用户名,默认为admin。
             * password:访问ActiveMQ服务的密码,默认为admin
             * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改
             *
             * borkerURL:访问ActiveMQ服务的路径地址。
             *  路径结构为:协议名://主机地址:端口号
             *  在conf/activemq.xml文件中可以找到修改
             *  在上一篇文章中都有介绍
             */
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            /**
             * 参数一:transacted 是否使用事务 可选值为:true|false
             *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理
             *      false:不使用事务,则设置我们的参数即可
             *
             * acknowledgeMode:
             * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             这里设置 这两个参数的含义为:
             不使用事务,并由Session自动确认提交
             * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理
             *
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列
            //名称不固定
            destination = session.createQueue("hello-mq-obj");

            //创建消息的消费者 需要指定目的地(也就是把Destination传入参数)
            consumer = session.createConsumer(destination);

            //创建消息对象 由消费者接收消息
            message = consumer.receive();

            //处理消息
            ObjectMessage objectMessage = (ObjectMessage) message;
            User user = (User) objectMessage.getObject();
            System.out.println("ActiveMQ say:" + user);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //回收消息接收者资源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

 

 

 

4 创建测试类

4.1 Produce生产者测试代码

package cn.arebirth.mq;

import cn.arebirth.pojo.User;

public class ProducerTest {
    public static void main(String[] args) {
        ActiveMQObjectProducer producer = new ActiveMQObjectProducer();
        producer.sendObjectActiveMQ(new User("Arebirth","123","Hello,ActiveMQ!"));
    }
}

 

当我们执行完这段代码后,我们打开我们的ActiveMQ的控制面板(首先你要启动你的ActiveMQ,上一篇有介绍到!)

《ActiveMQ学习总结------实战操作(上)02》

 

 会看到 队列中增加了我们的消息!

 

4.2 Consumer消费者测试代码

package cn.arebirth.mq;

public class ConsumerTest {
    public static void main(String[] args) {
        ActiveMQObjectConsumer consumer = new ActiveMQObjectConsumer();
        consumer.receiveObjectActiveMQ();
    }
}

当我们执行后,出现下面的内容即可!成功啦!

《ActiveMQ学习总结------实战操作(上)02》

 

 在看下,控制面板,

《ActiveMQ学习总结------实战操作(上)02》

 

 

是不是已经和上一个列子一样啦!

 

 

ps

  本篇内容就到这里了,希望各位勤加敲代码,毕竟代码不是看出来的~

 

相关文章