前言

博主github

博主个人博客http://blog.healerjean.com

1、多个broker

61616/8161

61617/8167  

1.1、windos配置

1.1.1、复制bin ->bin-61617

1567578024760

1.1.1.1、修改 bin-61617/activemq文件

1567578082855

# Active MQ configuration directory
if [ -z "$ACTIVEMQ_CONF" ] ; then

    # For backwards compat with old variables we let ACTIVEMQ_CONFIG_DIR set ACTIVEMQ_CONF
    if [ -z "$ACTIVEMQ_CONFIG_DIR" ] ; then
        ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf-61617"
    else
        ACTIVEMQ_CONF="$ACTIVEMQ_CONFIG_DIR"
    fi
fi
# Location of the pidfile
if [ -z "$ACTIVEMQ_PIDFILE" ]; then
  ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq-61617.pid"
fi

1.1.1.2、修改bin-61617/win64/activemq.bat

重命名为activemq-61617.bat

1567578208867

goto conf-61617
1.1.1.3、修改bin-61617/win64/wrapper.conf
# wrapper.debug=TRUE
set.default.ACTIVEMQ_HOME=../..
set.default.ACTIVEMQ_BASE=../..
set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf-61617
set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data
wrapper.working.dir=.

1.1.2、复制conf -> conf-61617

1567578335651

1.1.1.1、修改 conf-61617\activemq.xml

1、brokerName不能和原来的重复

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName-61617" dataDirectory="${activemq.data}">

2、数据存放的文件名称不能重复

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb-61617"/>
</persistenceAdapter>

3、所有涉及的transportConnector的端口,都要和原来的不一样。注意不要超出端口的范围(0-65535)

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:56772?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61713?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1873?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61714?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
1.1.1.2、修改 conf-61617\jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8167"/>
</bean>

1.1.3、启动bin/win64/activemq.bat 和bin-61617/win64/activemq-61617.bat

1.1.3.1、cmd控制台

1567578617052

1567578628540

1.1.3.2、浏览器
http://localhost:8161/admin/

1567578725924

http://localhost:8167/admin

1567578737845

1.2、linux配置(很早之前的笔记)

1.2.1、复制一个conf,复制为conf2

1567578802668

1.2.1.1、修改里面的activemq.xml文件

1、brokerName不能和原来的重复

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mybrokerName" dataDirectory="${activemq.data}">

2、数据存放的文件名称不能重复

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb2"/>
</persistenceAdapter>

3、所有涉及的transportConnector的端口,都要和原来的不一样。注意不要超出端口的范围(0-65535)

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:56772?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61713?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1873?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61714?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

1.2.1.2、修改jetty.xml,把里面的默认端口号8161改成8167
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8167"/>
</bean>

1.2.2、到bin下面,复制一个activemq,比如叫activemq2

1567579019422

1、修改程序的id,不能和原来的重复,ACTIVEMQ_PIDFILE=”$ACTIVEMQ_DATA/activemq2.pid”

# Location of the pidfile
if [ -z "$ACTIVEMQ_PIDFILE" ]; then
  ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq2.pid"
fi

2、修改配置文件路径ACTIVEMQ_CONF=”$ACTIVEMQ_BASE/conf2”

# For backwards compat with old variables we let ACTIVEMQ_CONFIG_DIR set ACTIVEMQ_CONF
    if [ -z "$ACTIVEMQ_CONFIG_DIR" ] ; then
        ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"
    else
        ACTIVEMQ_CONF="$ACTIVEMQ_CONFIG_DIR"
    fi

3、修改端口,tcp://localhost:61616把61616改成和activemq.xml里面的tcp端口一致。请注意,在activemq5.9.0版本中是这么修改。 我使用的是最新的5.12.1版本,在activemq中找不到该tcp端口的配置,折腾了半天才发现该版本把这个配置挪到了env文件。所以就需要拷贝一份env,比如就叫env2吧,

1567579099912

然后再env2里面把61616改成和activemq.xml里面的tcp端口一致。

# Specify the queue manager URL for using "browse" option of sysv initscript
if [ -z "$ACTIVEMQ_QUEUEMANAGERURL" ]; then
    ACTIVEMQ_QUEUEMANAGERURL="--amqurl tcp://localhost:61617"
fi

最后别忘了把activemq2里面对env的引用改成env2。

else
  ACTIVEMQ_CONFIGS="/etc/default/activemq $HOME/.activemqrc $ACTIVEMQ_HOME/bin/env2"
fi

2、ActiveMQ的networkConnector

ActiveMQnetworkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息,这就是所谓的”桥接”。

ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker不仅发送消息而且也能从相同的通道接收消息,通常作为duplex connector来映射,如下:

1567576177212

2.1、一个Broker发,一个Broker接(比自己发自己接收快)

broker 61616activemq.xml添加下面的标签networkConnectors


<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<!-- 静态的网络连接 前面url61616只能发送,后面的url61617只能接受-->

<networkConnectors>
    <networkConnector name="local network" 
                      uri="static://(tcp://localhost:61616,tcp://localhost:61617)"/>
</networkConnectors>

2.1.1、启动这两个观察浏览器

broker 61616

1567580363177

broker 61617 什么都没

1567580377973

2.1.1.1、创建一个61616生产者、消费者61617

2.1.1.1.1、61616生产者
package com.hlj.activemq.d05_静态的网络连接.d01_一个发一个接;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Producer61616 {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "FirstQueue";
    /** 发送消息的数量 */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);
            //2 、创建MapMessage
            // sendMapMessage(session, producer);


            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 2、创建MapMessage
     */
    private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setStringProperty("setStringProperty_key_" + i, "setStringProperty_key_" + i);
            mapMessage.setString("setString_key_" + i, "setString_value " + i);
            producer.send(mapMessage);
        }
    }

    /**
     * 1、创建TextMessage
     */
    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }

}

2.1.1.1.2、61617消费者
package com.hlj.activemq.d05_静态的网络连接.d01_一个发一个接;

import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Consumer61617 {


    public static final String QUEUE_NAME = "FirstQueue";
    public static final Long   WITE_TIME = (100L * 1000L);
    public static final String TCP_URL = "tcp://localhost:61617";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                TCP_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);
            //2、接收MapMessage
            // receiveMapMessage(consumer);


            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 2、接收MapMessage
     */
    private static void receiveMapMessage(MessageConsumer consumer) throws JMSException {
        int i = 1;
        while (true) {
            //100s内阻塞等待消息的传入
            MapMessage message = (MapMessage) consumer.receive(WITE_TIME);
            if (null != message) {
                System.out.printf("收到消息:");
                System.out.printf(message.getString("setString_key_" + i));
                System.out.printf(message.getStringProperty("setStringProperty_key_" + i));
                System.out.println();
                i++;
            } else {
                break;
            }
        }
    }


    /**
     * 1、接收TestMessage
     */
    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive(WITE_TIME);
            if (null != message) {
                System.out.println("收到消息" + message.getText());
            } else {
                break;
            }
        }
    }
}




2.1.1.1.3、运行这个消费者,会发现浏览器中同步了
1、观察浏览器8161
http://localhost:8161/admin/queues.jsp

1567581870852

2、观察浏览器8167
http://localhost:8167/admin/queues.jsp

1567581888068

2.1.1.1.4、消费者运行状态下,运行这个生产者
1、观察浏览器8161
http://localhost:8161/admin/queues.jsp

1567581994395

2、观察浏览器8167
http://localhost:8167/admin/queues.jsp

1567582007093

3、观察616167消费者控制台
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5

2.2、设置双向的 duplex="true"

networkConnection

  • duplex=”true”该配置可以让消息从A到B,或者从B到A。但只能流动一次。也就是不能从A到B再回到A,也就是不能回流
  • 如果duplex=”false”则只能从A到B
<networkConnectors>
    <networkConnector 
                      duplex="true"
                      name="local network" 
                      uri="static://(tcp://localhost:61616,tcp://localhost:61617)"/>
</networkConnectors>

ContactAuthor