前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

浏览器queue参数说明

1566973386617

@:Name 队列的名称

@:Number Of Pending Messages 等待消费的消息数量

@:Number Of Consumers 消费者的数量

@:Messages Enqueued 入队的数量。已经产生的消息的数量,所有产生的消息的数量

@:Messages Dequeued 出队的数量,已经被消费者消费的消息的数量

1、简单的生产者和消费

1.1、配置Maven


<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

1.2、创建一个功能类

public class ActiveMqConstant {

    /** 默认用户名   */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**  * 默认密码 */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**   默认连接地址  */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
}

1.3、创建一个生产者

1.3.1、queue持久化(默认)

QUEUE 默认是持久化的,也就是说,当activemq挂掉再重启,还是可以看到的

1.3.2、queue非持久化

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

ActiveMQ:默认的持久化方式, 如果关闭ActiveMQ 再开启,造成消息丢失,浏览器中会存在关于该queue这个东西,但是全部数值为0

Mysql:持久化方式,如果关闭ActiveMQ,再开启,造成消息丢失,浏览器中什么都看不到,不存在该queue这个东西

package com.hlj.activemq.one;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "FirstQueue";
    /**
     * 发送消息的数量
     */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);
            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);
            
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }



    /**
     * 1、创建TextMessage
     */
    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}


1.4、运行这个生产者,观察浏览器

http://localhost:8161/admin/queues.jsp

1566973386617

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
FirstQueue 5 0 5 0

1.5、创建一个消费者

package com.hlj.activemq.one;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Consumer {


    public static final String QUEUE_NAME = "FirstQueue";


    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            ActiveMqConstant.USERNAME,
            ActiveMqConstant.PASSWORD,
            ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程,事务是false,具体为什么看后面的事务签收
            Session session = connection.createSession(
                Boolean.FALSE,
                Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            reveiveTestMessage(consumer);


            // 没有事务,下面session.commit();提交会报错
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }






    /**
     * 1、接收TestMessage
     */
    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive(100 * 1000);
            if (null != message) {
                System.out.println("收到消息" + message.getText());
            } else {
                break;
            }
        }
    }



1.6、运行这个消费者,观察控制台和浏览器

1.6.1、控制台

收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5

1.6.2、浏览器

1566973767880

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
FirstQueue 0 1 5 5

1.7、MapMessage

1.7.1、生产者发送消息

/**
     * 2、创建MapMessage
     */
private static void sendMapMessage(Session session, MessageProducer producer) 
    throws JMSException {
    for (int i = 1; i <= SEND_NUMBER; i++) {
        MapMessage mapMessage = session.createMapMessage();
        mapMessage.setStringProperty("setStringProperty_key_" + i, "setStringProperty_key_" + i);
        mapMessage.setString("setString_key_" + i, "setString_value " + i);
        producer.send(mapMessage);
    }
}

1.7.2、消费者接受消息

/**
     * 2、接收MapMessage
     *
     * @param consumer
     * @throws JMSException
     */
private static void receiveMapMessage(MessageConsumer consumer) throws JMSException {
    int i = 1;
    while (true) {
        //100s内阻塞等待消息的传入
        MapMessage message = (MapMessage) consumer.receive(100 * 1000);
        if (null != message) {
            System.out.printf("收到消息:");
            System.out.printf(message.getString("setString_key_" + i));
            System.out.printf(message.getStringProperty("setStringProperty_key_" + i));
            System.out.println();
            i++;
        } else {
            break;
        }
    }
}

1.7.3、测试,观察控制台

收到消息setString_value 1setStringProperty_key_1
收到消息setString_value 2setStringProperty_key_2
收到消息setString_value 3setStringProperty_key_3
收到消息setString_value 4setStringProperty_key_4
收到消息setString_value 5setStringProperty_key_5

1.8、消费者监听接收消息的方式

**消费者接收消息有两种方式 **

同步接收:主线程阻塞式等待下一个消息的到来consumer.receive(),可以设置·timeout,超时则返回null。当接到消息后会自动确认

异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。当消息到达后消息执行完毕才能够确认,如果中间发生了异常就不会被确认,如果只有一个消费者的话,那完蛋了,这样就会一直循环,一般情况下回重试6次后认为这个消息有毒,将它放到死信队列

/**
* 2、监听接收消息
异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。这样的话session.close(); 和  connection.close(); 不要设置哦,因为关闭之后就子线程就没了
*/
private static void reveiveByListeneMessage(MessageConsumer consumer) 
    throws JMSException {
    consumer.setMessageListener(msg -> {
        try {
            TextMessage message = (TextMessage)msg;
            System.out.println(message.getText());
        } catch (JMSException e) {
        }
    });
}


2、事务的签收

**消息一直不被签收,默认只能重复接收6次 ,将它放到死信队列中去 **

2.1、消息成功消费的三个阶段

JMS消息只有在被确认之后才能够认为消息是被成功的消费了,消息成功消费一共有三个阶段,

1、客户接受消息

2、客户处理消息

3、消息被确认

2.2、事务

2.2.1、自动签收-消费者不启用事务

消费者不启用事务,每次都会被消费成功,因为已经自动签收了,也就是说客户端消费了消息,再次消费不会显示之前的消息

session = connection.createSession(Boolean.false, Session.AUTO_ACKNOWLEDGE);


//没有事务,下面session.commit()提交会报错
//session.commit()

2.2.2、自动签收-消费者启用事务

**当使用事务的时候,必须commit,否则表示没有消费,消费者每次都能访问到之前所访问过的数据。 **

session = connection.createSession(Boolean.true, Session.AUTO_ACKNOWLEDGE)

2.3、签收

2.3..1、Session.AUTO_ACKNOWLEDGE 自动签收

2.3.2、Session.CLIENT_ACKNOWLEDGE 需要客户端确认

看名字Client就知道,肯定是消费者使用的,

message.acknowledge();

使用时,如果客户端已经消费了消息,则使用该方法,对message进行签收,则会签收所有被使用的消息,再次调用消费者,则不会收到数据

不使用的时候,则所有消费者使用的过的数据,再次调用还会收到数据,因为客户端没有签收。

注意 : 这种模式,确认是在回话层面的,如果一个消费者消费了10个消息,,在确认第5个消息的时候确认,那么这10个消息都会被签收

3、测试完成的代码

3.1、生产者



public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "FirstQueue";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);
            //2 、创建MapMessage
            // sendMapMessage(session, producer);


            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 2、创建MapMessage
     */
    private static void sendGroupMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setStringProperty("setStringProperty_key_" + i, "setStringProperty_key_" + i);
            mapMessage.setString("setString_key_" + i, "setString_value " + i);
            producer.send(mapMessage);
        }
    }


    /**
     * 2、创建MapMessage
     */
    private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("messageAA--" + i);
            message.setStringProperty("JMSXGroupID","GroupA");
            producer.send(message);

            TextMessage message2 = session.createTextMessage("messageBB--" + i);
            message2.setStringProperty("JMSXGroupID","GroupB");
            producer.send(message2);
        }
    }



    /**
     * 1、创建TextMessage
     */
    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }




}



3.2、消费者

package com.hlj.activemq.d01_简单的生产者消费者_事务的签收;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Consumer {


    public static final String QUEUE_NAME = "FirstQueue";
    public static final Long   WITE_TIME = (100L * 1000L);


    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);
            //2、接收MapMessage
            // receiveMapMessage(consumer);


            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    // 同步接收:主线程阻塞式等待下一个消息的到来,可以设置timeout,超时则返回null。
    // 异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。这样的话session.close(); 和  connection.close(); 不要设置哦,因为关闭之后就子线程就没了


    /**
     * 3、监听接收消息
     */
    private static void reveiveByListeneMessage(MessageConsumer consumer) throws JMSException {
        consumer.setMessageListener(msg -> {
            try {
                TextMessage txtMsg = (TextMessage)msg;
                System.out.println("Receiver11111===="+txtMsg.getText());
            } catch (JMSException e) {
            }
        });
    }


    /**
     * 2、接收MapMessage
     */
    private static void receiveMapMessage(MessageConsumer consumer) throws JMSException {
        int i = 1;
        while (true) {
            //100s内阻塞等待消息的传入
            MapMessage message = (MapMessage) consumer.receive(WITE_TIME);
            if (null != message) {
                System.out.printf("收到消息:");
                System.out.printf(message.getString("setString_key_" + i));
                System.out.printf(message.getStringProperty("setStringProperty_key_" + i));
                System.out.println();
                i++;
            } else {
                break;
            }
        }
    }

    

    /**
     * 1、接收TestMessage
     */
    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive(WITE_TIME);
            if (null != message) {
                System.out.println("收到消息" + message.getText());
            } else {
                break;
            }
        }
    }

}





ContactAuthor