多个Broker和ActiveMQ静态的网络连接
前言
博主个人博客http://blog.healerjean.com
1、多个broker
61616/8161
61617/8167
1.1、windos配置
1.1.1、复制bin ->bin-61617
1.1.1.1、修改 bin-61617/activemq
文件
# Active MQ configuration directory
if [ -z "$ACTIVEMQ_CONF" ] ; then
# For backwards compat with old variables we let ACTIVEMQ_CONFIG_DIR set ACTIVEMQ_CONF
if [ -z "$ACTIVEMQ_CONFIG_DIR" ] ; then
ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf-61617"
else
ACTIVEMQ_CONF="$ACTIVEMQ_CONFIG_DIR"
fi
fi
# Location of the pidfile
if [ -z "$ACTIVEMQ_PIDFILE" ]; then
ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq-61617.pid"
fi
1.1.1.2、修改bin-61617/win64/activemq.bat
重命名为activemq-61617.bat
goto conf-61617
1.1.1.3、修改bin-61617/win64/wrapper.conf
# wrapper.debug=TRUE
set.default.ACTIVEMQ_HOME=../..
set.default.ACTIVEMQ_BASE=../..
set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf-61617
set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data
wrapper.working.dir=.
1.1.2、复制conf -> conf-61617
1.1.1.1、修改 conf-61617\activemq.xml
1、brokerName不能和原来的重复
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName-61617" dataDirectory="${activemq.data}">
2、数据存放的文件名称不能重复
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb-61617"/>
</persistenceAdapter>
3、所有涉及的transportConnector的端口,都要和原来的不一样。注意不要超出端口的范围(0-65535)
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:56772?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61713?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1873?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61714?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
1.1.1.2、修改 conf-61617\jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8167"/>
</bean>
1.1.3、启动bin/win64/activemq.bat 和bin-61617/win64/activemq-61617.bat
1.1.3.1、cmd控制台
1.1.3.2、浏览器
http://localhost:8161/admin/
http://localhost:8167/admin
1.2、linux配置(很早之前的笔记)
1.2.1、复制一个conf,复制为conf2
1.2.1.1、修改里面的activemq.xml文件
1、brokerName不能和原来的重复
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mybrokerName" dataDirectory="${activemq.data}">
2、数据存放的文件名称不能重复
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb2"/>
</persistenceAdapter>
3、所有涉及的transportConnector的端口,都要和原来的不一样。注意不要超出端口的范围(0-65535)
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:56772?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61713?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1873?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61714?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
1.2.1.2、修改jetty.xml,把里面的默认端口号8161改成8167
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8167"/>
</bean>
1.2.2、到bin下面,复制一个activemq,比如叫activemq2
1、修改程序的id,不能和原来的重复,ACTIVEMQ_PIDFILE=”$ACTIVEMQ_DATA/activemq2.pid”
# Location of the pidfile
if [ -z "$ACTIVEMQ_PIDFILE" ]; then
ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq2.pid"
fi
2、修改配置文件路径ACTIVEMQ_CONF=”$ACTIVEMQ_BASE/conf2”
# For backwards compat with old variables we let ACTIVEMQ_CONFIG_DIR set ACTIVEMQ_CONF
if [ -z "$ACTIVEMQ_CONFIG_DIR" ] ; then
ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"
else
ACTIVEMQ_CONF="$ACTIVEMQ_CONFIG_DIR"
fi
3、修改端口,tcp://localhost:61616把61616改成和activemq.xml里面的tcp端口一致。请注意,在activemq5.9.0版本中是这么修改。 我使用的是最新的5.12.1版本,在activemq中找不到该tcp端口的配置,折腾了半天才发现该版本把这个配置挪到了env文件。所以就需要拷贝一份env,比如就叫env2吧,
然后再env2里面把61616改成和activemq.xml里面的tcp端口一致。
# Specify the queue manager URL for using "browse" option of sysv initscript
if [ -z "$ACTIVEMQ_QUEUEMANAGERURL" ]; then
ACTIVEMQ_QUEUEMANAGERURL="--amqurl tcp://localhost:61617"
fi
最后别忘了把activemq2里面对env的引用改成env2。
else
ACTIVEMQ_CONFIGS="/etc/default/activemq $HOME/.activemqrc $ACTIVEMQ_HOME/bin/env2"
fi
2、ActiveMQ的networkConnector
ActiveMQ
的networkConnector
默认是单向的,一个Broker
在一端发送消息,另一个Broker
在另一端接收消息,这就是所谓的”桥接”。
ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker不仅发送消息而且也能从相同的通道接收消息,通常作为duplex connector
来映射,如下:
2.1、一个Broker发,一个Broker接(比自己发自己接收快)
broker 61616
的activemq.xml
添加下面的标签networkConnectors
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!-- 静态的网络连接 前面url61616只能发送,后面的url61617只能接受-->
<networkConnectors>
<networkConnector name="local network"
uri="static://(tcp://localhost:61616,tcp://localhost:61617)"/>
</networkConnectors>
2.1.1、启动这两个观察浏览器
broker 61616
broker 61617 什么都没
2.1.1.1、创建一个61616生产者、消费者61617
2.1.1.1.1、61616生产者
package com.hlj.activemq.d05_静态的网络连接.d01_一个发一个接;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer61616 {
/**
* 队列的名称
*/
public static final String QUEUE_NAME = "FirstQueue";
/** 发送消息的数量 */
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// 根据目的地获取一个生产者
MessageProducer producer = session.createProducer(destination);
//构造消息
//1 、创建TextMessage
sendTextMessage(session, producer);
//2 、创建MapMessage
// sendMapMessage(session, producer);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 2、创建MapMessage
*/
private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setStringProperty("setStringProperty_key_" + i, "setStringProperty_key_" + i);
mapMessage.setString("setString_key_" + i, "setString_value " + i);
producer.send(mapMessage);
}
}
/**
* 1、创建TextMessage
*/
private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
2.1.1.1.2、61617消费者
package com.hlj.activemq.d05_静态的网络连接.d01_一个发一个接;
import com.hlj.activemq.constants.ActiveMqConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer61617 {
public static final String QUEUE_NAME = "FirstQueue";
public static final Long WITE_TIME = (100L * 1000L);
public static final String TCP_URL = "tcp://localhost:61617";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
TCP_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
//根据目的地获取一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//消费消息
//1、接收TestMessage
reveiveTestMessage(consumer);
//2、接收MapMessage
// receiveMapMessage(consumer);
// 没有事务,下面提交会报错
// session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 2、接收MapMessage
*/
private static void receiveMapMessage(MessageConsumer consumer) throws JMSException {
int i = 1;
while (true) {
//100s内阻塞等待消息的传入
MapMessage message = (MapMessage) consumer.receive(WITE_TIME);
if (null != message) {
System.out.printf("收到消息:");
System.out.printf(message.getString("setString_key_" + i));
System.out.printf(message.getStringProperty("setStringProperty_key_" + i));
System.out.println();
i++;
} else {
break;
}
}
}
/**
* 1、接收TestMessage
*/
private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
while (true) {
//100s内阻塞等待消息的传入
TextMessage message = (TextMessage) consumer.receive(WITE_TIME);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
}
}
2.1.1.1.3、运行这个消费者,会发现浏览器中同步了
1、观察浏览器8161
http://localhost:8161/admin/queues.jsp
2、观察浏览器8167
http://localhost:8167/admin/queues.jsp
2.1.1.1.4、消费者运行状态下,运行这个生产者
1、观察浏览器8161
http://localhost:8161/admin/queues.jsp
2、观察浏览器8167
http://localhost:8167/admin/queues.jsp
3、观察616167消费者控制台
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5
2.2、设置双向的 duplex="true"
networkConnection
- duplex=”true”该配置可以让消息从A到B,或者从B到A。但只能流动一次。也就是不能从A到B再回到A,也就是不能回流
- 如果duplex=”false”则只能从A到B
<networkConnectors>
<networkConnector
duplex="true"
name="local network"
uri="static://(tcp://localhost:61616,tcp://localhost:61617)"/>
</networkConnectors>