多个Queue同时发送_不活动的队列删除
前言
Github:https://github.com/HealerJean
1、同时发送多个queue,并同时或者分步接收
1.1、生产者
package com.hlj.activemq.d07_destination.d01_多个queue同时发送;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
/**
* 队列的名称
*/
public static final String QUEUE_NAME = "queue_1,queue_2";
/** 发送消息的数量 */
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// 根据目的地获取一个生产者
MessageProducer producer = session.createProducer(destination);
//构造消息
//1 、创建TextMessage
sendTextMessage(session, producer);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
1.2、消费者
package com.hlj.activemq.d07_destination.d01_多个queue同时发送;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static final String QUEUE_NAME = "queue_1,queue_2";
public static final String QUEUE_NAME_1 = "queue_1";
public static final String QUEUE_NAME_2 = "queue_2";
public static final Long WITE_TIME = (100L * 1000L);
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// Destination destination = session.createQueue(QUEUE_NAME_1);
// Destination destination = session.createQueue(QUEUE_NAME_2);
//根据目的地获取一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//消费消息
//1、接收TestMessage
reveiveTestMessage(consumer);
// 没有事务,下面提交会报错
// session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 1、接收TestMessage
*/
private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
while (true) {
//100s内阻塞等待消息的传入
TextMessage message = (TextMessage) consumer.receive(WITE_TIME);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
}
}
1.3、启动生产者
1.3.1、观察浏览器
name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
---|---|---|---|---|
queue_1 | 5 | 0 | 5 | 0 |
queue_2 | 5 | 0 | 5 | 0 |
1.4、启动消费者 :同时接收多个队列
Destination destination = session.createQueue(QUEUE_NAME);
1.4.1、观察控制台
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5
收到消息ActiveMq 发送的消息5
1.4.2、观察8161浏览器
可以看到相当于启动了两个消费者,然后分别将这10条消息消费调了,所以,我们也分别启动消费者queue_1和消费者queue_2进行处理消息
name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
---|---|---|---|---|
queue_1 | 0 | 1 | 5 | 5 |
queue_2 | 0 | 1 | 5 | 5 |
2、删除空(不活动的(死))的队列
2.1、解释
**在ActiveMQ的消息Queue需要设置三个属性 **
**1、web控制台删除(一般没人用的,很傻 **
2、通过配置,使得broker可以自动探测到无用的队列并删除掉,回收资源(生产使用的方式)。
2.2、具体参数配置
参数名 | 默认值 | 含义 |
---|---|---|
schedulePeriodForDestinationPurge | 默认为0,不检查 | 设置多长时间检查一次,这里是10秒 |
gcInactiveDestinations | 默认为false,不删除 | 表示启用清理功能; |
inactiveTimoutBeforeGC | 默认为60秒 | queue的(没有消息产生了)超时后删除 ,在规定的时间内,无有效订阅,没有入队记录,超时后就会被清理(控制台不显示,删除成功)。 (如果没有消息,消费者一直在线的话,浏览器控制台不会删除,除非关闭这个消费者,因为消费者一直在读取(相当于我们一直在记录这里的历史记录,我们签收后,真实消息已经不存在了,这里只是空的而已,也没什么影响)) |
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}"
schedulePeriodForDestinationPurge="1000"
>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">"
gcInactiveDestinations="true"
inactiveTimeoutBeforeGC="30000" />
</policyEntries>
</policyMap>
</destinationPolicy>
2.3、测试生产者
public class Producer {
/**
* 队列的名称
*/
public static final String QUEUE_NAME = "queue_expire";
/** 发送消息的数量 */
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// 根据目的地获取一个生产者
MessageProducer producer = session.createProducer(destination);
//构造消息
//1 、创建TextMessage
sendTextMessage(session, producer);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
2.4、消费者
public class Consumer {
public static final String QUEUE_NAME = "queue_expire";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
//根据目的地获取一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//消费消息
//1、接收TestMessage
reveiveTestMessage(consumer);
// 没有事务,下面提交会报错
// session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
while (true) {
//100s内阻塞等待消息的传入
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
}
}
2.5、启动生产者
2.5.1、观察 8161浏览器
name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
---|---|---|---|---|
queue_expire | 5 | 0 | 5 | 0 |
2.6、运行消费者(运行完毕,手动关闭)
如果没有消息,消费者一直在线的话,也不会删除的,因为是一直读消息的
2.6.1、观察 8161浏览器
name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
---|---|---|---|---|
queue_expire | 0 | 0 | 5 | 5 |
2.7、30秒之后,浏览器找不到这个东西了
3、删除不活动(死)的订阅者
某些Topic消息因为客户端在订阅后却长时间离线,而一直进驻内存,影响系统内存量及稳定性
参数名 | 默认 | 含义 |
---|---|---|
offlineDurableSubscriberTaskSchedule | 默认300秒 | 多长时间检查一次 |
offlineDurableSubscriberTimeout | —1,不删除 | 离线多长时间就过去删除 |
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}"
schedulePeriodForDestinationPurge="1000"
offlineDurableSubscriberTimeout="20000"
offlineDurableSubscriberTaskSchedule="10000"
>
3.1、生产者
public class PersistenceProducer {
public static final String TOPIC_NAME = "topic_expire";
/**
* 发送消息的数量
*/
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
//设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//一定要砸在上面持久化订阅设置完成之后再start这个connection,否则会有问题
connection.start();
System.out.println("创建持久化生产者");
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("message" + i);
producer.send(message);
}
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2、消费者
public class PersistenceConsumer {
public static final String TOPIC_NAME = "topic_expire";
public static final Long WITE_TIME = (1000L);
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
Connection connection = connectionFactory.createConnection();
//设置连接客户端 id
connection.setClientID("HealerJean");
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
//创建持久化的订阅者,订阅者的名称 name
TopicSubscriber consumer = session.createDurableSubscriber(topic, "name");
//一定要砸在上面持久化订阅设置(createDurableSubscriber)完成之后再start这个connection,否则会有问题
connection.start();
System.out.println("创建持久化消费者");
Message message = consumer.receive();
while (message != null) {
TextMessage txtMsg = (TextMessage) message;
System.out.println("收到消 息:" + txtMsg.getText());
message = consumer.receive(WITE_TIME);
}
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.3、运行消费者后再运行生产者
3.3.1、查看消费者控制台
收到消 息:message1
收到消 息:message2
收到消 息:message3
收到消 息:message4
收到消 息:message5
3.3.1、观察浏览器查看订阅者
- 刚运行完
- 过20秒,发现已经删除
- 同时topic记录还在
4、删除挤压的持久层( 消息挤压处理)
4.1、为持久化消息设置过期时间(过期了进入死信队列)
activeMQ
提供了一个timeStampingBrokerPlugin
插件,通过此插件,我们可以为持久化消息设置过期时间。这两个值一般会设置成一样的。通过这个配置客户端不再接收到过期的消息,message过期则客户端不能接收
1)ttlCeiling:表示过期时间上限(程序写的过期时间不能超过此时间,超过则以此时间为准)
2)zeroExpirationOverride:表示过期时间(给未分配过期时间的消息分配过期时间)
<plugins>
<timeStampingBrokerPlugin ttlCeiling="86400000" zeroExpirationOverride="86400000"/>
</plugins>
消费者接收消息有两种方式(下面都是AUTO_ACKNOWLEDGE
)
-
同步接收:主线程阻塞式等待下一个消息的到来
consumer.receive()
,可以设置timeout
,超时则返回null。当接到消息后会自动确认。 -
异步接收:主线程设置
MessageListener
,然后继续做自己的事,子线程负责监听。当消息到达后消息执行完毕才能够确认,如果中间发生了异常就不会被确认,如果只有一个消费者的话,那完蛋了,这样就会一直循环,一般情况下回重试6次后认为这个消息有毒,将它放到死信队列。
死信队列是用来保存处理失败(包括持久化和非持久化)或者是过期消息的,如果是持久化消息过期,会默认送到死信队列中,而非持久化消息默认不会送到死信队列中。 死信队列不会被自动清除。对于过期的消息进入死信队列可以配置一些处理策略, **比如直接抛弃死信队列、定时抛弃死信队列、设置慢消费者策略等 **
4.2、死信队列设置
消息过期后会进入死信队列,默认进入ACTIVEMQ.DLQ
队列,且不会自动清除;
对于过期的消息进入死信队列还有一些可选的策略:
-
保存在一个共享的
ACTIVEMQ.DLQ
队列(默认),不会自动清除 -
设置是否将过期消息放入队列的开关以及死信队列消息过期时间。
-
放入各自的死信通道
4.2.1、直接抛弃死信队列
AcitveMQ
提供了一个便捷的插件:DiscardingDLQBrokerPlugin
,来抛弃DeadLetter
。如果开发者不需要关心DeadLetter
,可以使用此策略。
<broker>
...
<plugins>
<!-- 丢弃所有死信-->
<discardingDLQBroker PlugindropAll="true"
dropTemporaryTopics="true"
dropTemporaryQueues="true" />
<!-- 根据名称丢弃指定死信-->
<!-- <discardingDLQBroker PlugindropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87"
reportInterval="1000" />-->
<!--使用丢弃正则匹配丢弃死信-->
<!--<discardingDLQBroker PlugindropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}"
reportInterval="3000"/>-->
</plugins>
...
</broker>
4.2.2、定时抛弃死信队列
4.2.2.1、过期消息定时删除死信队列
默认情况下,ActiveMQ
永远不会过期发送到DLQ
的消息。但是,从ActiveMQ 5.12
开始,deadLetterStrategy
支持expiration
属性,其值以毫秒为单位。
rocessExpired为false,表示过期消息不进入死信队列,即执行删除操作,true可以进入死信队列
expiration 多长时间检测一次,再进行删除。
<policyEntry topic=">" >
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="true" expiration="30000"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry queue=">" >
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="true" expiration="30000"/>
</deadLetterStrategy>
</policyEntry>
4.2.1.2、表示过期消息不进入死信队列,即执行删除操作
expireMessagesPeriod=60000表示每隔60s检查消息是否过期
processExpired为false,表示过期消息不进入死信队列,即执行删除操作
<policyEntries>
<policyEntry topic=">" expireMessagesPeriod="60000">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>
<policyEntry queue=">" expireMessagesPeriod="60000">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>