前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、非持久化的topic消息:

必须接收方在线,这个不会帮我们保存

1.1、创建生产者

package com.hlj.activemq.d02_topic持久化和非持久化.d01_非持久化;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class NoPersistenceProducer {

    /**
     * 非持久化topic名称
     */
    public static final String TOPIC_NAME = "no_persiterce_topic_name";
    /**
     * 发送消息的数量
     */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createTopic(TOPIC_NAME);
            MessageProducer producer = session.createProducer(destination);


            for (int i = 0; i < SEND_NUMBER; i++) {
                TextMessage message = session.createTextMessage("message" + i);
                producer.send(message);
            }

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

1.2、创建消费者

package com.hlj.activemq.d02_topic持久化和非持久化.d01_非持久化;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class NoPersistenceConsumer {

    /**
     * 非持久化topic名称
     */
    public static final String TOPIC_NAME = "no_persiterce_topic_name";
    public static final Long   WITE_TIME = (100L * 1000L);

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createTopic(TOPIC_NAME);
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                message = consumer.receive(WITE_TIME);
            }

            session.commit();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

1.3、运行生产者,不运行消费者

1.1、观察浏览器

1566980589040

可以看到没有消费者在线,已经生产了3条消息了,

name Number Of Consumers Messages Enqueued Messages Dequeued
no_persiterce_topic_name 0 5 0

1.4、运行消费者

1.4.1、观察控制台

发现控制台一直在等待,但是没有消息能够读取

1.4.2、观察浏览器

1566980818525

消费者在线了,但是消息却没有出队列,因为我们这个非持久的消息必须是消费者在线

name Number Of Consumers Messages Enqueued Messages Dequeued
no_persiterce_topic_name 1 5 0

1.5、运行生产者

说明:这个时候消费者是在线状态,等待消息的传入 ,这个时候运行生产者,观察消费者控制台

1.5.1、消费者控制台

收到消 message0
收到消 message1
收到消 message2
收到消 message3
收到消 message4

1.5.2、观察浏览器

1566981177326

这个时候,入队(已经生产的消息的数量)的消息一共有5+5=10条,消费者读取到了5条

name Number Of Consumers Messages Enqueued Messages Dequeued
no_persiterce_topic_name 0 10 5

2、持久化的topic消息 :

持久化的topic,即使还没有生产消息,但一般情况下需要消费者提前订阅,因为这样,即使不在线,下次连接,也可以接受之前从没收过的消息,而已经收到的消息,则不会重复接受

**持久化模式下可有有多个clientID同时在线,但是同一个clientID,只能同时在线一个消费者,这也是虚拟topic产生的原因之一 : **

activemq区分消费者,是通过clientID和订阅客户名称来区分的,使用相同的clientID,则认为是同一个消费者。两个程序使用相同的clientID,则同时只能有一个连接到activemq,第二个连接的会报错

2.1、创建生产者

 MessageProducer producer = session.createProducer(topic);
//设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//一定要砸在上面持久化订阅设置完成之后再start这个connection,否则会有问题
connection.start();
package com.hlj.activemq.d02_topic持久化和非持久化.D02_持久化;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class PersistenceProducer {


    public static final String TOPIC_NAME = "persiterce_topic_name";
    /**
     * 发送消息的数量
     */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            Connection connection = connectionFactory.createConnection();

            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            Topic topic = session.createTopic(TOPIC_NAME);
            MessageProducer producer = session.createProducer(topic);
            //设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //一定要砸在上面持久化订阅设置完成之后再start这个connection,否则会有问题
            connection.start();
            System.out.println("创建持久化生产者");

            for (int i = 1; i <= SEND_NUMBER; i++) {
                TextMessage message = session.createTextMessage("message" + i);
                producer.send(message);
            }

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


2.2、创建消费订阅者 name

//设置连接客户端 id
connection.setClientID("HealerJean");
            
Topic topic = session.createTopic(TOPIC_NAME);
//创建持久化的订阅者,订阅者的名称 name
TopicSubscriber consumer = session.createDurableSubscriber(topic, "name");  
//一定要砸在上面持久化订阅设置(createDurableSubscriber)完成之后再start这个connection,否则会有问题
connection.start();
package com.hlj.activemq.d02_topic持久化和非持久化.D02_持久化;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class PersistenceConsumer {

    public static final String TOPIC_NAME = "persiterce_topic_name";
    public static final Long WITE_TIME = (1000L);


    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            Connection connection = connectionFactory.createConnection();
            
             //设置连接客户端 id ,持久化模式下可有有多个clientID,但是同一个clientID,只能同时在线一个消费者
            connection.setClientID("HealerJean");


            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            Topic topic = session.createTopic(TOPIC_NAME);
            //创建持久化的订阅者,订阅者的名称 name
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "name");
            //一定要砸在上面持久化订阅设置(createDurableSubscriber)完成之后再start这个connection,否则会有问题
            connection.start();
            System.out.println("创建持久化消费者");

            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                message = consumer.receive(WITE_TIME);
            }
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}


2.3、运行消费订阅者 name (这个时候消费者一直在等待消息哦)

2.3.1、观察浏览器

  • 有一个订阅者出现了

1566984898908

  • 即使没有发布消息,但是实际上我们已经将topic创建出来了

1566987970895

2.4、断开消费订阅者name 控制台,观察浏览器,消费者离线了哦

会观察到,这个时候这个订阅者跑到了未在线里面去了

1566984956078

2.5、运行生产者

2.5.1、观察浏览器

1566985072703

会观察到有一个消费者,但是事实上我们的订阅消费者已经挂掉了,不是么,上面消费者控制台都关闭了,虽然控制台关闭了,但是其实我这里认为是一个离线状态的订阅消费者。而且计算它了

name Number Of Consumers Messages Enqueued Messages Dequeued
persiterce_topic_name 1 5 0

2.6、修改消费者name代码,再创建一个消费订阅者name2

只讲name修改为name2,然后运行即可

TopicSubscriber consumer = session.createDurableSubscriber(topic, "name2");

观察控制台,会发现没有消息接收到,因为我一开始也其实说了,人家发布消息之前你还没来呢

2.6、运行消费者name

2.6.1、观察控制台

创建持久化消费者
17:39:48.841 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:MI-201902210704-50981-1566985188565-1:1:1
收到消 message1
收到消 message2
收到消 message3
收到消 message4
收到消 message5

2.6.2、观察浏览器

1566988235828

会发现有2个消费者,5条入队的消息,5条被消费者消费出队的消息,

这里强调一点的是,其实我们这5条出队消息真正意义上只是提示消费者接受了几条消息,而不是真正的出队。因为持久化的topic 如果有两个消费者同时在线的话,出队消息的数量 =消息数量*2。并不等于入队的消息数量

不信,往下看

name Number Of Consumers Messages Enqueued Messages Dequeued
persiterce_topic_name 2 5 5

2.7、再次运行生产者

2.7.1、观察控制台

1566988589022

没毛病,消费者不在线,肯定不能消费消息,所以入队信息为10

name Number Of Consumers Messages Enqueued Messages Dequeued
persiterce_topic_name 2 10 5

2.8、运行消费者 name

讲代码中name2变成name即可

2.8.1、观察控制台

出现了消息,因为之前订阅过了

收到消 息:message1
收到消 息:message2
收到消 息:message3
收到消 息:message4
收到消 息:message5

2.8.2、观察浏览器

1566988839373

name Number Of Consumers Messages Enqueued Messages Dequeued
persiterce_topic_name 2 10 10

2.9、运行消费者 name2

讲代码中name改为name

2.9.1、 观察控制台

出现了消息,因为之前订阅过了

收到消 息:message1
收到消 息:message2
收到消 息:message3
收到消 息:message4
收到消 息:message5

2.9.2、观察浏览器

1566988953065

name Number Of Consumers Messages Enqueued Messages Dequeued
persiterce_topic_name 2 10 15

3.0、总结2.8和2.9

这样就证明了2.6.2中的说法是正确的。

3、VirtualTopic:虚拟topic

VirtualTopic是为了解决持久化模式下多消费端同时接收同一条消息的问题。

分布式应用,这样可以避免同一个应用订阅同一个主题时导致必须修改clientId的限制(个人理解,其实我们就是在一个客户端下调用,用多个clientId不太好吧),同时又可以在同一个应用的不同进行负载均衡

3.2、场景分析

生产端产生了一笔订单,作为消息发了出去,这笔订单既要入订单系统归档,又要入结算系统收款,那怎么办呢?

1持久化订单很重要丢了可不行

2同时接收既要归档又要结算

3生产端只需向一个Destination发送一把钥匙开一把锁保持发送的一致性否则容易乱套

3.2.1、可能的解决方案

方案A: 使用Topic订阅模式,虽然满足1对多同时接收,然而持久化模式下只能有一个持有clientID的消费者连接,不满足持久化需求((个人理解,其实我们就是在一个客户端下调用,用多个clientId不太好吧))

方案B: 使用单队列,队列是1对1模式,消息只能给一个消费者,不满足多个同时接收的需求

方案C: 使用多队列,显然生产者不太愿意一条消息发送很多次,分别发送给不同的队列,万一队列A发送成功,队列B发送失败怎么办?一致性无法保证,容易乱套

**方案D:就是将Topic和Queue相结合,各取所长。VirtualTopic,对生产者而言它是Topic,对消费者而言它是Queue **

3.2、生产者Topic ,VirtualTopic.Name

对于消息发布者来说,就是一个正常的topic,名称以VirtualTopic.开始


Destination destination = session.createTopic("VirtualTopic.Name");

public class Producer {

    /**
     * 队列的名称
     */
    public static final String VIRTUAL_TOPIC_NAME = "VirtualTopic.Name";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createTopic(VIRTUAL_TOPIC_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}

3.3、消费者 A

对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列名称,即可表明自己的身份即可实现消费端应用分组

 Destination destination = session.createQueue("Consumer.AA.VirtualTopic.Name");


Consumer.A.VirtualTopic.Orders说明它是名称为A的消费端同理Consumer.B VirtualTopic.Orders说明是一名称为B的消费端可以在同一个应用中使用多个消费者消费这个队列

public class ConsumerA {

    public static final String CONSUMER_VIRTUAL_TOPIC_NAME = "Consumer.AA.VirtualTopic.Name";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.//名称为A的区别
            Destination destination = session.createQueue(CONSUMER_VIRTUAL_TOPIC_NAME);

            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);

            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println(CONSUMER_VIRTUAL_TOPIC_NAME + "收到消息" + message.getText());
            } else {
                break;
            }
        }
    }

}

3.4、消费者B


public class ConsumerB {

    public static final String CONSUMER_VIRTUAL_TOPIC_NAME = "Consumer.BB.VirtualTopic.Name";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.//名称为A的区别
            Destination destination = session.createQueue(CONSUMER_VIRTUAL_TOPIC_NAME);

            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);

            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println(CONSUMER_VIRTUAL_TOPIC_NAME + "收到消息" + message.getText());
            } else {
                break;
            }
        }
    }




}

3.5、运行两个消费者,之后再运行生产者

3.4.1、消费者A控制台

Consumer.AA.VirtualTopic.Name收到消息ActiveMq的消息1
Consumer.AA.VirtualTopic.Name收到消息ActiveMq的消息2
Consumer.AA.VirtualTopic.Name收到消息ActiveMq的消息3
Consumer.AA.VirtualTopic.Name收到消息ActiveMq的消息4
Consumer.AA.VirtualTopic.Name收到消息ActiveMq的消息5

3.5.1、消费者B控制台

Consumer.BB.VirtualTopic.Name收到消息ActiveMq的消息1
Consumer.BB.VirtualTopic.Name收到消息ActiveMq的消息2
Consumer.BB.VirtualTopic.Name收到消息ActiveMq的消息3
Consumer.BB.VirtualTopic.Name收到消息ActiveMq的消息4
Consumer.BB.VirtualTopic.Name收到消息ActiveMq的消息5

2.5.3、8161浏览器

2.5.3.1、queue

1567748451256

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
Consumer.AA.VirtualTopic.Name 0 1 5 5
Consumer.BB.VirtualTopic.Name 0 1 5 5
2.5.3.2、topic

1567748684504

ContactAuthor