Queue生产者消费者_事务的签收
前言
Github:https://github.com/HealerJean
浏览器queue参数说明
@:Name
队列的名称
@:Number Of Pending Messages
等待消费的消息数量
@:Number Of Consumers
消费者的数量
@:Messages Enqueued
入队的数量。已经产生的消息的数量,所有产生的消息的数量
@:Messages Dequeued
出队的数量,已经被消费者消费的消息的数量
1、简单的生产者和消费
1.1、配置Maven
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
1.2、创建一个功能类
public class ActiveMqConstant {
/** 默认用户名 */
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认密码 */
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** 默认连接地址 */
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
}
1.3、创建一个生产者
1.3.1、queue持久化(默认)
QUEUE
默认是持久化的,也就是说,当activemq
挂掉再重启,还是可以看到的
1.3.2、queue非持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
ActiveMQ
:默认的持久化方式, 如果关闭ActiveMQ
再开启,造成消息丢失,浏览器中会存在关于该queue
这个东西,但是全部数值为0
Mysql
:持久化方式,如果关闭ActiveMQ
,再开启,造成消息丢失,浏览器中什么都看不到,不存在该queue
这个东西
package com.hlj.activemq.one;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
/**
* 队列的名称
*/
public static final String QUEUE_NAME = "FirstQueue";
/**
* 发送消息的数量
*/
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// 根据目的地获取一个生产者
MessageProducer producer = session.createProducer(destination);
sendTextMessage(session, producer);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 1、创建TextMessage
*/
private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
1.4、运行这个生产者,观察浏览器
http://localhost:8161/admin/queues.jsp
name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
---|---|---|---|---|
FirstQueue | 5 | 0 | 5 | 0 |
1.5、创建一个消费者
package com.hlj.activemq.one;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static final String QUEUE_NAME = "FirstQueue";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程,事务是false,具体为什么看后面的事务签收
Session session = connection.createSession(
Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
//根据目的地获取一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//消费消息
reveiveTestMessage(consumer);
// 没有事务,下面session.commit();提交会报错
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 1、接收TestMessage
*/
private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
while (true) {
//100s内阻塞等待消息的传入
TextMessage message = (TextMessage) consumer.receive(100 * 1000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
}
1.6、运行这个消费者,观察控制台和浏览器
1.6.1、控制台
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5
1.6.2、浏览器
name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
---|---|---|---|---|
FirstQueue | 0 | 1 | 5 | 5 |
1.7、MapMessage
1.7.1、生产者发送消息
/**
* 2、创建MapMessage
*/
private static void sendMapMessage(Session session, MessageProducer producer)
throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setStringProperty("setStringProperty_key_" + i, "setStringProperty_key_" + i);
mapMessage.setString("setString_key_" + i, "setString_value " + i);
producer.send(mapMessage);
}
}
1.7.2、消费者接受消息
/**
* 2、接收MapMessage
*
* @param consumer
* @throws JMSException
*/
private static void receiveMapMessage(MessageConsumer consumer) throws JMSException {
int i = 1;
while (true) {
//100s内阻塞等待消息的传入
MapMessage message = (MapMessage) consumer.receive(100 * 1000);
if (null != message) {
System.out.printf("收到消息:");
System.out.printf(message.getString("setString_key_" + i));
System.out.printf(message.getStringProperty("setStringProperty_key_" + i));
System.out.println();
i++;
} else {
break;
}
}
}
1.7.3、测试,观察控制台
收到消息:setString_value 1setStringProperty_key_1
收到消息:setString_value 2setStringProperty_key_2
收到消息:setString_value 3setStringProperty_key_3
收到消息:setString_value 4setStringProperty_key_4
收到消息:setString_value 5setStringProperty_key_5
1.8、消费者监听接收消息的方式
**消费者接收消息有两种方式 **
同步接收:主线程阻塞式等待下一个消息的到来
consumer.receive()
,可以设置·timeout
,超时则返回null。当接到消息后会自动确认。异步接收:主线程设置
MessageListener
,然后继续做自己的事,子线程负责监听。当消息到达后消息执行完毕才能够确认,如果中间发生了异常就不会被确认,如果只有一个消费者的话,那完蛋了,这样就会一直循环,一般情况下回重试6次后认为这个消息有毒,将它放到死信队列。
/**
* 2、监听接收消息
异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。这样的话session.close(); 和 connection.close(); 不要设置哦,因为关闭之后就子线程就没了
*/
private static void reveiveByListeneMessage(MessageConsumer consumer)
throws JMSException {
consumer.setMessageListener(msg -> {
try {
TextMessage message = (TextMessage)msg;
System.out.println(message.getText());
} catch (JMSException e) {
}
});
}
2、事务的签收
**消息一直不被签收,默认只能重复接收6次 ,将它放到死信队列中去 **
2.1、消息成功消费的三个阶段
JMS消息只有在被确认之后才能够认为消息是被成功的消费了,消息成功消费一共有三个阶段,
1、客户接受消息
2、客户处理消息
3、消息被确认
2.2、事务
2.2.1、自动签收-消费者不启用事务
消费者不启用事务,每次都会被消费成功,因为已经自动签收了,也就是说客户端消费了消息,再次消费不会显示之前的消息
session = connection.createSession(Boolean.false, Session.AUTO_ACKNOWLEDGE);
//没有事务,下面session.commit()提交会报错
//session.commit()
2.2.2、自动签收-消费者启用事务
**当使用事务的时候,必须commit,否则表示没有消费,消费者每次都能访问到之前所访问过的数据。 **
session = connection.createSession(Boolean.true, Session.AUTO_ACKNOWLEDGE)
2.3、签收
2.3..1、Session.AUTO_ACKNOWLEDGE
自动签收
2.3.2、Session.CLIENT_ACKNOWLEDGE
需要客户端确认
看名字Client就知道,肯定是消费者使用的,
message.acknowledge()
;使用时,如果客户端已经消费了消息,则使用该方法,对
message
进行签收,则会签收所有被使用的消息,再次调用消费者,则不会收到数据不使用的时候,则所有消费者使用的过的数据,再次调用还会收到数据,因为客户端没有签收。
注意 : 这种模式,确认是在回话层面的,如果一个消费者消费了10个消息,,在确认第5个消息的时候确认,那么这10个消息都会被签收
3、测试完成的代码
3.1、生产者
public class Producer {
/**
* 队列的名称
*/
public static final String QUEUE_NAME = "FirstQueue";
/** 发送消息的数量 */
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// 根据目的地获取一个生产者
MessageProducer producer = session.createProducer(destination);
//构造消息
//1 、创建TextMessage
sendTextMessage(session, producer);
//2 、创建MapMessage
// sendMapMessage(session, producer);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 2、创建MapMessage
*/
private static void sendGroupMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setStringProperty("setStringProperty_key_" + i, "setStringProperty_key_" + i);
mapMessage.setString("setString_key_" + i, "setString_value " + i);
producer.send(mapMessage);
}
}
/**
* 2、创建MapMessage
*/
private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("messageAA--" + i);
message.setStringProperty("JMSXGroupID","GroupA");
producer.send(message);
TextMessage message2 = session.createTextMessage("messageBB--" + i);
message2.setStringProperty("JMSXGroupID","GroupB");
producer.send(message2);
}
}
/**
* 1、创建TextMessage
*/
private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
3.2、消费者
package com.hlj.activemq.d01_简单的生产者消费者_事务的签收;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static final String QUEUE_NAME = "FirstQueue";
public static final Long WITE_TIME = (100L * 1000L);
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
//根据目的地获取一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//消费消息
//1、接收TestMessage
reveiveTestMessage(consumer);
//2、接收MapMessage
// receiveMapMessage(consumer);
// 没有事务,下面提交会报错
// session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 同步接收:主线程阻塞式等待下一个消息的到来,可以设置timeout,超时则返回null。
// 异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。这样的话session.close(); 和 connection.close(); 不要设置哦,因为关闭之后就子线程就没了
/**
* 3、监听接收消息
*/
private static void reveiveByListeneMessage(MessageConsumer consumer) throws JMSException {
consumer.setMessageListener(msg -> {
try {
TextMessage txtMsg = (TextMessage)msg;
System.out.println("Receiver11111===="+txtMsg.getText());
} catch (JMSException e) {
}
});
}
/**
* 2、接收MapMessage
*/
private static void receiveMapMessage(MessageConsumer consumer) throws JMSException {
int i = 1;
while (true) {
//100s内阻塞等待消息的传入
MapMessage message = (MapMessage) consumer.receive(WITE_TIME);
if (null != message) {
System.out.printf("收到消息:");
System.out.printf(message.getString("setString_key_" + i));
System.out.printf(message.getStringProperty("setStringProperty_key_" + i));
System.out.println();
i++;
} else {
break;
}
}
}
/**
* 1、接收TestMessage
*/
private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
while (true) {
//100s内阻塞等待消息的传入
TextMessage message = (TextMessage) consumer.receive(WITE_TIME);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
}
}