前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、多线程集群访问ActiveMQ消息

生产者运行之前运行者两个消费者,因为如果先开启生产者,一旦有消费者率先读取到消息之后,就会霸占所有的消息(2中有一定的测试解释),除非回流。

测试的话,10个消息,61616和61617都各自有30个消费者,哪个收到的消息多,哪个就快,通过后面测试的结果可以知道,我应该将均衡消费取消掉,不要讲一个broker当做一个消费者来对待,才能测试成功,而这时测试的时候还没有取消掉

单向连接哦

1567670049200

1.1、生产者Producer61616 10个消息

public class Producer61616 {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "Thread.Consumer.queue";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 10;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            sendTextMessage(session, producer);
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 1、创建TextMessage
     */
    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}

1.2、线程类ConsumerThread

public class ConsumerThread extends Thread {

    private ConnectionFactory cf ;
    private String queueName ;

    public ConsumerThread(ConnectionFactory cf,String queueName){
        this.cf = cf;
        this.queueName = queueName;
    }

    @Override
    public void run(){
        try{
            Connection connection = cf.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(message -> {
                TextMessage txtMsg = (TextMessage) message;
                try {
                    System.out.println("Receiver===" + txtMsg.getText()+consumer.toString());
                    session.commit();
                    //如果注释掉下面的话,是会一直监听,
                    // 而这里呢,表示每个消费者只读取一次就好,关闭的话,这个consumer 也就没了,这个线程就会结束
                    session.close();
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }catch(Exception err){
            err.printStackTrace();
        }
    }

}

1.3、消费者 Consumer61616 30个消费者

public class Consumer61616 {


    public static final String QUEUE_NAME = "Thread.Consumer.queue";
    public static final String TCP_URL = "tcp://localhost:61616";
    public static final  int THREAD_COUNT = 30 ;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);

        for (int i = 1; i <= THREAD_COUNT; i++) {
            Thread thread = new ConsumerThread(connectionFactory, QUEUE_NAME);
            thread.start();
            //延迟一秒,观察日志,保证正在监听
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程"+i+"启动");
        }
    }

}



1.4、消费者Consumer61617 30个消费者


public class Consumer61617 {


    public static final String QUEUE_NAME = "Thread.Consumer.queue";
    public static final String TCP_URL = "tcp://localhost:61617";
    public static final  int THREAD_COUNT = 30 ;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);

        for (int i = 1; i <= THREAD_COUNT; i++) {
            Thread thread = new ConsumerThread(connectionFactory, QUEUE_NAME);
            thread.start();
            //延迟一秒,观察日志,保证正在监听
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程"+i+"启动");
        }
    }


}




1.5、运行这两个消费者

1.5.1、61616消费者控制台

1567593167953

1.5.2、61617消费者控制台

1567593184368

1.5.3、8161浏览器

1567674691819

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
Thread.Consumer.queue 0 31 0 0

对于8161/61616 这个activemq看来,虽然8167/61617上有30个消费者,但是在8161/61616这台机器看来是被当做一个消费者(不是一般意义上的一个消费者哦),所以是8161/61616上的30个消费者加 另一台8167/61616机器 = 31

1.5.4、8167浏览器

1567674672000

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
Thread.Consumer.queue 0 30 0 0

8167/61617 这台机器上为运行了30个线程,每个线程创建了一个消费者,所以是30

1.6、运行生产者,成功消费

1.6.1、8167浏览器

1567674536461

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
Thread.Consumer.queue 0 29 1 1

这里和我们想象的不太一样,因为按照上面一开始的打算,其实8167/61617这台机器接收到的消息应该多才对,单事实上我们才消费了一条消息,算是测试失败了,可能是消息数量不够???

其实呢,这就是均衡消费 ,那为什么是1呢,你想啊,一共10个消息,31个消费者,肯定是不能平分,所以碰巧给你分配一个呗,也有可能一个都不给你,31比1 ,这都能抢到,也恰恰说明了,static确实是比较快 ,当然具有偶然性,如果想好好测的话,就阻止均衡消费了再来侧 。哈哈

1.6.2、8161浏览器

1567674503902

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
Thread.Consumer.queue 0 22 10 10

1.6.1中说,它消费了1个消息,所以我们这台机器就是消费了9条消息,因为线程中奖连接关闭的原因,就会损失9个消费者 所以是 31 - 9 = 22 (8137/61617这台机器算做一个哦,别忘记了)

消息全部被消费了, 消息一共生成了10个,10个被消费了

2、集群下的回流功能 (一般queue采用用到)

2.1、配置(两个broker都要配置)

2.1.1、61616 broker


<networkConnectors>
    <networkConnector     
                      duplex="true"
                      name="local network" 
                      uri="static://(tcp://localhost:61616,tcp://localhost:61617)" />
</networkConnectors>

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic=">" >
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>


            <policyEntry queue=">" enableAudit="false">
                <networkBridgeFilterFactory>
                    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                </networkBridgeFilterFactory>
            </policyEntry>


        </policyEntries>
    </policyMap>
</destinationPolicy>


2.1.2、61617Broker

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic=">" >
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>




            <policyEntry queue=">" enableAudit="false">
                <networkBridgeFilterFactory>
                    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                </networkBridgeFilterFactory>
            </policyEntry>


        </policyEntries>
    </policyMap>
</destinationPolicy>

2.1、61616生产者、生成30个消息



public class Producer61616 {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "back.queue";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 30;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            sendTextMessage(session, producer);
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 1、创建TextMessage
     */
    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}


2.2、线程

public class ConsumerThread extends Thread {

    private ConnectionFactory cf ;
    private String queueName ;

    public ConsumerThread(ConnectionFactory cf,String queueName){
        this.cf = cf;
        this.queueName = queueName;
    }

    @Override
    public void run(){
        try{
            Connection connection = cf.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(message -> {
                TextMessage txtMsg = (TextMessage) message;
                try {
                    System.out.println( txtMsg.getText()+consumer.toString());
                    session.commit();
                    //一直监听,不要关闭,关闭的话,这个consumer 也就没了
                    // session.close();
                    // connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }catch(Exception err){
            err.printStackTrace();
        }
    }

}

2.3、61616 10个消费者



public class Consumer61616 {


    public static final String QUEUE_NAME = "back.queue";
    public static final String TCP_URL = "tcp://localhost:61616";
    public static final  int THREAD_COUNT = 10 ;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);

        for (int i = 1; i <= THREAD_COUNT; i++) {
            Thread thread = new ConsumerThread(connectionFactory, QUEUE_NAME);
            thread.start();
            //延迟一秒,观察日志,保证正在监听
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程"+i+"启动");
        }
    }

}

2.4、61617 10个消费者


public class Consumer61617 {


    public static final String QUEUE_NAME = "back.queue";
    public static final String TCP_URL = "tcp://localhost:61617";
    public static final  int THREAD_COUNT = 10 ;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);

        for (int i = 1; i <= THREAD_COUNT; i++) {
            Thread thread = new ConsumerThread(connectionFactory, QUEUE_NAME);
            thread.start();
            //延迟一秒,观察日志,保证正在监听
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程"+i+"启动");
        }
    }

}


2.5、先运行61616生产者,生成10条消息

2.5.1、8161浏览器

1567673059626

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
back.queue 30 0 30 0

2.6、运行消费者61617,读取到消息后,手速要快的关闭

上面我们说过,如果先运行生产者,再运行消费者,那么这个消费者就会霸占所有的消息,讲消息全部拿过来

2.6.1、观察8161浏览器

1567670541718

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
back.queue 0 0 30 30

可以看到这30个消息已经出队列了,说明消息已经被全部拿走霸占了

2.6.2。观察8167浏览器

1567670551984

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
back.queue 23 0 30 7

会发现有23条消息没有读取,有30条消息过来了,说明消息全部被我们拿过来霸占了(即使没有读完),因为手速的关系,我读取了7条消息就关闭了8167/61617上的所有消费者

2.7、运行消费者61616

2.7.1、观察8161浏览器

1567671275324

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
back.queue 0 10 53 53

会发现消息是0,说明在61616这台activemq中已经没有消息可以被读取了,

消费者是10 ,我们现在8161/61616这个机器上确实运行着10个消费者

生成的消息 = 30+ 23 = 53, 这里的23就是回流过来的消息

消费的消息 = 30+ 23= 53,也就是我们8161/61616这个机器读取了23个消息,回流成功。

2.7.2、观察8167浏览器

1567671420557

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
back.queue 0 1 30 30

这里的消费者是1,其实值的是我们的8161/61616这里的activemq机器,虽然它的机器现在运行着10个消费者,但是对我们来说,其实就是一个,这里将它当做一个整体了

总共30条消息拿过来了,然后有30条消息回流被消费了

3、failover故障转移,容错的连接

// failover故障转移
// randomize false 不允许随机,如果能够连上61616 则用它,连不上在用61617
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMqConstant.USERNAME,
    ActiveMqConstant.PASSWORD,
    "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");

4、均衡消费:默认一个broker的多个客户端(消费者),被当做一个对待

**在1中我使用了10个个消息,但是呢,8161/61616的消费者30个,8167/61617的消费者也是30个,消费者数量比消息数量还多,如果测试1的话,一定要关闭均衡消费再测) **

所以我现在准备8161/61616 、8167/61617各自只有一个消费者,消息数量是10

4.1、生产者 30条消息

public class Producer61616 {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "average.Consumer.queue";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 30;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            sendTextMessage(session, producer);
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 1、创建TextMessage
     */
    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }


}

4.2、消费者 61616

public class Consumer61616 {


    public static final String QUEUE_NAME = "average.Consumer.queue";
    public static final String TCP_URL = "tcp://localhost:61616";
    public static final int COUNT = 1;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);

        try {
            for(int i = 1 ; i <= COUNT; i++ ){
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(QUEUE_NAME);
                MessageConsumer consumer = session.createConsumer(destination);
                int size = i ;
                consumer.setMessageListener(message -> {
                    TextMessage txtMsg = (TextMessage) message;
                    try {
                        System.out.println("61616消费者"+size+txtMsg.getText());
                        session.commit();
                        //一直监听,不要关闭,关闭的话,这个consumer 也就没了
                        // session.close();
                        // connection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}


4.3、消费者 61617



public class Consumer61617 {


    public static final String QUEUE_NAME = "average.Consumer.queue";
    public static final String TCP_URL = "tcp://localhost:61617";
    public static final int COUNT = 1;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);

        try {

            for(int i = 1 ; i <= COUNT; i++ ){
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(QUEUE_NAME);
                MessageConsumer consumer = session.createConsumer(destination);
                int size = i ;
                consumer.setMessageListener(message -> {
                    TextMessage txtMsg = (TextMessage) message;
                    try {
                        System.out.println("61617消费者"+size+txtMsg.getText());
                        session.commit();
                        //一直监听,不要关闭,关闭的话,这个consumer 也就没了
                        // session.close();
                        // connection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}


4.4、先运行这两个消费者

4.4.1、观察8161浏览器

1567675194799

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 2 0 0

有两个消费者

4.4.2、观察8167浏览器

1567675246112

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 2 0 0

也有两个消费者,这里想,不对啊,1中测试的时候按理说8167/61617不应该有8161/61616里面的消费者啊,这里想说的是,已经配置回流了,所以理所当然是两个消费者,但是消息并不 一样哦。

4.4、运行生产者

4.4.1、观察61616消费者控制台

61616消费者1ActiveMq 发送的消息1
61616消费者1ActiveMq 发送的消息3
61616消费者1ActiveMq 发送的消息5
61616消费者1ActiveMq 发送的消息7
61616消费者1ActiveMq 发送的消息9
61616消费者1ActiveMq 发送的消息11
61616消费者1ActiveMq 发送的消息13
61616消费者1ActiveMq 发送的消息15
61616消费者1ActiveMq 发送的消息17
61616消费者1ActiveMq 发送的消息19
61616消费者1ActiveMq 发送的消息21
61616消费者1ActiveMq 发送的消息23
61616消费者1ActiveMq 发送的消息25
61616消费者1ActiveMq 发送的消息27
61616消费者1ActiveMq 发送的消息29

4.4.2、观察61617消费者

61617消费者1ActiveMq 发送的消息2
61617消费者1ActiveMq 发送的消息4
61617消费者1ActiveMq 发送的消息6
61617消费者1ActiveMq 发送的消息8
61617消费者1ActiveMq 发送的消息10
61617消费者1ActiveMq 发送的消息12
61617消费者1ActiveMq 发送的消息14
61617消费者1ActiveMq 发送的消息16
61617消费者1ActiveMq 发送的消息18
61617消费者1ActiveMq 发送的消息20
61617消费者1ActiveMq 发送的消息22
61617消费者1ActiveMq 发送的消息24
61617消费者1ActiveMq 发送的消息26
61617消费者1ActiveMq 发送的消息28
61617消费者1ActiveMq 发送的消息30

4.4.3、观察8161浏览器

1567677645890

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 2 30 30

4.4.4、观察8167浏览器

1567677660641

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 2 15 15

4.4.5、总结

我们将61617中创建两个消费者,发现控制台结果还是一样的5/5分成,并没有分到多的消息,显然这和我们的集群的最终设想是违背的,这就是均衡消费

4.4.5.1、如果61617两个消费者,61616也是两个消费者

那么浏览器大概如下,也是均衡消费哦,注意是网络消费者(8167/61617这台机器)被当做一个消费者对待了,自身的消费者的数量还是算的,自己拿到消息,再平均处理。

1、61616消费者控制台 一共20条消息
61616消费者1ActiveMq 发送的消息1
61616消费者2ActiveMq 发送的消息2
61616消费者2ActiveMq 发送的消息5
61616消费者1ActiveMq 发送的消息4
61616消费者1ActiveMq 发送的消息7
61616消费者2ActiveMq 发送的消息8
61616消费者2ActiveMq 发送的消息11
61616消费者1ActiveMq 发送的消息10
61616消费者2ActiveMq 发送的消息14
61616消费者1ActiveMq 发送的消息13
61616消费者2ActiveMq 发送的消息17
61616消费者1ActiveMq 发送的消息16
61616消费者2ActiveMq 发送的消息20
61616消费者1ActiveMq 发送的消息19
61616消费者2ActiveMq 发送的消息23
61616消费者1ActiveMq 发送的消息22
61616消费者2ActiveMq 发送的消息26
61616消费者1ActiveMq 发送的消息25
61616消费者2ActiveMq 发送的消息29
61616消费者1ActiveMq 发送的消息28
61617消费者控制台 一共10条消息 而且是两个消费者均分的
61617消费者1ActiveMq 发送的消息3
61617消费者2ActiveMq 发送的消息6
61617消费者1ActiveMq 发送的消息9
61617消费者2ActiveMq 发送的消息12
61617消费者1ActiveMq 发送的消息15
61617消费者2ActiveMq 发送的消息18
61617消费者1ActiveMq 发送的消息21
61617消费者2ActiveMq 发送的消息24
61617消费者1ActiveMq 发送的消息27
61617消费者2ActiveMq 发送的消息30
8161浏览器

1567677916150

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 3 30 30
8167浏览器

1567677926350

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 3 10 10

4.5、阻止均衡消费

conduitSubscriptions多个网络消费者是否被当做一个消费者来对待。,默认为true

<networkConnectors>
    <networkConnector 
                      duplex="true"   
                      conduitSubscriptions="false" 
                      name="local network" 
                      uri="static://(tcp://localhost:61616,tcp://localhost:61617)" />
</networkConnectors>

4.5.1、将61616消费者数量设置为1,61617消费者数量设置为2

4.5.1.1、61616消费者控制台
61616消费者1ActiveMq 发送的消息3
61616消费者1ActiveMq 发送的消息6
61616消费者1ActiveMq 发送的消息9
61616消费者1ActiveMq 发送的消息12
61616消费者1ActiveMq 发送的消息15
61616消费者1ActiveMq 发送的消息18
61616消费者1ActiveMq 发送的消息21
61616消费者1ActiveMq 发送的消息24
61616消费者1ActiveMq 发送的消息27
61616消费者1ActiveMq 发送的消息30
4.5.2.2、61617消费者控制台
61617消费者1ActiveMq 发送的消息1
61617消费者2ActiveMq 发送的消息2
61617消费者1ActiveMq 发送的消息4
61617消费者2ActiveMq 发送的消息5
61617消费者1ActiveMq 发送的消息7
61617消费者2ActiveMq 发送的消息8
61617消费者1ActiveMq 发送的消息10
61617消费者2ActiveMq 发送的消息11
61617消费者1ActiveMq 发送的消息13
61617消费者2ActiveMq 发送的消息14
61617消费者1ActiveMq 发送的消息16
61617消费者2ActiveMq 发送的消息17
61617消费者1ActiveMq 发送的消息19
61617消费者2ActiveMq 发送的消息20
61617消费者1ActiveMq 发送的消息22
61617消费者2ActiveMq 发送的消息23
61617消费者1ActiveMq 发送的消息25
61617消费者2ActiveMq 发送的消息26
61617消费者1ActiveMq 发送的消息28
61617消费者2ActiveMq 发送的消息29
4.5.2.3、8161浏览器

1567678511975

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 3 30 30
4.5.2.3、8166浏览器

1567678519624

name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued
average.Consumer.queue 0 3 20 20

4.5.2、总结

可以看到上面确实不会将broker当做一个消费者了,但是消息还是会平均分配给消费者的,这也是合理的,所以我们说的均衡消费其实是针对broker来说的

ContactAuthor