前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、同时发送多个queue,并同时或者分步接收

1.1、生产者

package com.hlj.activemq.d07_destination.d01_多个queue同时发送;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "queue_1,queue_2";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}

1.2、消费者

package com.hlj.activemq.d07_destination.d01_多个queue同时发送;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Consumer {

    public static final String QUEUE_NAME = "queue_1,queue_2";
    public static final String QUEUE_NAME_1 = "queue_1";
    public static final String QUEUE_NAME_2 = "queue_2";
    public static final Long   WITE_TIME = (100L * 1000L);


    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);
            // Destination destination = session.createQueue(QUEUE_NAME_1);
            // Destination destination = session.createQueue(QUEUE_NAME_2);


            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);

            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 1、接收TestMessage
     */
    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive(WITE_TIME);
            if (null != message) {
                System.out.println("收到消息" + message.getText());
            } else {
                break;
            }
        }
    }


}




1.3、启动生产者

1.3.1、观察浏览器

1567738873099

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
queue_1 5 0 5 0
queue_2 5 0 5 0

1.4、启动消费者 :同时接收多个队列

Destination destination = session.createQueue(QUEUE_NAME);

1.4.1、观察控制台

收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5
收到消息ActiveMq 发送的消息5

1.4.2、观察8161浏览器

1567739051708

可以看到相当于启动了两个消费者,然后分别将这10条消息消费调了,所以,我们也分别启动消费者queue_1和消费者queue_2进行处理消息

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
queue_1 0 1 5 5
queue_2 0 1 5 5

2、删除空(不活动的(死))的队列

2.1、解释

**在ActiveMQ的消息Queue需要设置三个属性 **

**1、web控制台删除(一般没人用的,很傻 **

2、通过配置,使得broker可以自动探测到无用的队列并删除掉,回收资源(生产使用的方式)。

2.2、具体参数配置

参数名 默认值 含义
schedulePeriodForDestinationPurge 默认为0,不检查 设置多长时间检查一次,这里是10秒
gcInactiveDestinations 默认为false,不删除 表示启用清理功能;
inactiveTimoutBeforeGC 默认为60秒 queue的(没有消息产生了)超时后删除 ,在规定的时间内,无有效订阅,没有入队记录,超时后就会被清理(控制台不显示,删除成功)。 (如果没有消息,消费者一直在线的话,浏览器控制台不会删除,除非关闭这个消费者,因为消费者一直在读取(相当于我们一直在记录这里的历史记录,我们签收后,真实消息已经不存在了,这里只是空的而已,也没什么影响))

<broker xmlns="http://activemq.apache.org/schema/core"
        brokerName="localhost" 
        dataDirectory="${activemq.data}"
        schedulePeriodForDestinationPurge="1000"
        >

      
<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic=">" >


                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
            
            
            <policyEntry queue=">" 
                         gcInactiveDestinations="true" 
                         inactiveTimeoutBeforeGC="30000" />
        </policyEntries>
    </policyMap>
</destinationPolicy>

2.3、测试生产者



public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "queue_expire";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}

2.4、消费者


public class Consumer {

    public static final String QUEUE_NAME = "queue_expire";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);


            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);

            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息" + message.getText());
            } else {
                break;
            }
        }
    }



}


2.5、启动生产者

2.5.1、观察 8161浏览器

1567741045134

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
queue_expire 5 0 5 0

2.6、运行消费者(运行完毕,手动关闭)

如果没有消息,消费者一直在线的话,也不会删除的,因为是一直读消息的

2.6.1、观察 8161浏览器

1567741222613

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
queue_expire 0 0 5 5

2.7、30秒之后,浏览器找不到这个东西了

1567741253457

3、删除不活动(死)的订阅者

某些Topic消息因为客户端在订阅后却长时间离线,而一直进驻内存,影响系统内存量及稳定性

参数名 默认 含义
offlineDurableSubscriberTaskSchedule 默认300秒 多长时间检查一次
offlineDurableSubscriberTimeout —1,不删除 离线多长时间就过去删除
<broker xmlns="http://activemq.apache.org/schema/core" 
    brokerName="localhost" 
    dataDirectory="${activemq.data}" 
    schedulePeriodForDestinationPurge="1000" 
    offlineDurableSubscriberTimeout="20000" 
    offlineDurableSubscriberTaskSchedule="10000"

    >

3.1、生产者


public class PersistenceProducer {


    public static final String TOPIC_NAME = "topic_expire";

    /**
     * 发送消息的数量
     */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            Connection connection = connectionFactory.createConnection();

            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            Topic topic = session.createTopic(TOPIC_NAME);
            MessageProducer producer = session.createProducer(topic);
            //设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //一定要砸在上面持久化订阅设置完成之后再start这个connection,否则会有问题
            connection.start();
            System.out.println("创建持久化生产者");

            for (int i = 1; i <= SEND_NUMBER; i++) {
                TextMessage message = session.createTextMessage("message" + i);
                producer.send(message);
            }

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.2、消费者

	
public class PersistenceConsumer {

    public static final String TOPIC_NAME = "topic_expire";
    public static final Long WITE_TIME = (1000L);


    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            Connection connection = connectionFactory.createConnection();
            //设置连接客户端 id
            connection.setClientID("HealerJean");


            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            Topic topic = session.createTopic(TOPIC_NAME);
            //创建持久化的订阅者,订阅者的名称 name
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "name");
            //一定要砸在上面持久化订阅设置(createDurableSubscriber)完成之后再start这个connection,否则会有问题
            connection.start();
            System.out.println("创建持久化消费者");

            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                message = consumer.receive(WITE_TIME);
            }
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

3.3、运行消费者后再运行生产者

3.3.1、查看消费者控制台

收到消 message1
收到消 message2
收到消 message3
收到消 message4
收到消 message5

3.3.1、观察浏览器查看订阅者

  • 刚运行完

1567759733208

  • 过20秒,发现已经删除

1567759769088

  • 同时topic记录还在

1567759797472

4、删除挤压的持久层( 消息挤压处理)

4.1、为持久化消息设置过期时间(过期了进入死信队列)

activeMQ提供了一个timeStampingBrokerPlugin插件,通过此插件,我们可以为持久化消息设置过期时间。

这两个值一般会设置成一样的。通过这个配置客户端不再接收到过期的消息,message过期则客户端不能接收

1)ttlCeiling:表示过期时间上限(程序写的过期时间不能超过此时间,超过则以此时间为准)
2)zeroExpirationOverride:表示过期时间(给未分配过期时间的消息分配过期时间)


<plugins>
  <timeStampingBrokerPlugin ttlCeiling="86400000" zeroExpirationOverride="86400000"/>
</plugins>

消费者接收消息有两种方式(下面都是AUTO_ACKNOWLEDGE

  • 同步接收:主线程阻塞式等待下一个消息的到来consumer.receive(),可以设置timeout,超时则返回null。当接到消息后会自动确认

  • 异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。当消息到达后消息执行完毕才能够确认,如果中间发生了异常就不会被确认,如果只有一个消费者的话,那完蛋了,这样就会一直循环,一般情况下回重试6次后认为这个消息有毒,将它放到死信队列

死信队列是用来保存处理失败(包括持久化和非持久化)或者是过期消息的,如果是持久化消息过期,会默认送到死信队列中,而非持久化消息默认不会送到死信队列中。 死信队列不会被自动清除。对于过期的消息进入死信队列可以配置一些处理策略, **比如直接抛弃死信队列、定时抛弃死信队列、设置慢消费者策略等 **

4.2、死信队列设置

消息过期后会进入死信队列,默认进入ACTIVEMQ.DLQ队列,且不会自动清除;

对于过期的消息进入死信队列还有一些可选的策略:

  • 保存在一个共享的ACTIVEMQ.DLQ队列(默认),不会自动清除

  • 设置是否将过期消息放入队列的开关以及死信队列消息过期时间。

  • 放入各自的死信通道

4.2.1、直接抛弃死信队列

AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。如果开发者不需要关心DeadLetter,可以使用此策略。

<broker>
...
<plugins>
    <!-- 丢弃所有死信-->
    <discardingDLQBroker PlugindropAll="true"  
                                      dropTemporaryTopics="true" 
                                      dropTemporaryQueues="true" />
    
    <!-- 根据名称丢弃指定死信-->
    <!-- <discardingDLQBroker PlugindropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" 
reportInterval="1000" />-->
    
    <!--使用丢弃正则匹配丢弃死信-->
    <!--<discardingDLQBroker PlugindropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" 
reportInterval="3000"/>-->
    </plugins>
    
    
   
    ...
 </broker>

4.2.2、定时抛弃死信队列

4.2.2.1、过期消息定时删除死信队列

默认情况下,ActiveMQ永远不会过期发送到DLQ的消息。但是,从ActiveMQ 5.12开始,deadLetterStrategy支持expiration属性,其值以毫秒为单位。

rocessExpired为false,表示过期消息不进入死信队列,即执行删除操作,true可以进入死信队列
expiration 多长时间检测一次,再进行删除。

<policyEntry topic=">" >
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="true" expiration="30000"/>
    </deadLetterStrategy>
</policyEntry>
<policyEntry queue=">" >  
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="true" expiration="30000"/>
    </deadLetterStrategy>
</policyEntry>


4.2.1.2、表示过期消息不进入死信队列,即执行删除操作
expireMessagesPeriod=60000表示每隔60s检查消息是否过期
processExpired为false,表示过期消息不进入死信队列,即执行删除操作

<policyEntries>
    <policyEntry topic=">" expireMessagesPeriod="60000">
        <deadLetterStrategy>
            <sharedDeadLetterStrategy processExpired="false" />
        </deadLetterStrategy>
    </policyEntry>
    <policyEntry queue=">" expireMessagesPeriod="60000">  
        <deadLetterStrategy>  
            <sharedDeadLetterStrategy processExpired="false" />  
        </deadLetterStrategy>  
    </policyEntry>
</policyEntries>

ContactAuthor