前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、Message时间设置

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间 毫秒
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔 毫秒
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

1.1、生产者

public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "queue";

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
            //延迟3秒
            Long delay = 3L * 1000L;
            //周期3秒
            Long period = 3L * 1000L;
            //重复5次 消费者受到的为 1(不重复的)  + 5 = 6
            int repeat = 5;
            TextMessage message = session.createTextMessage("MESSAGE消息");
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            producer.send(message);
    }


}

1.2、消费者


public class Consumer {

    public static final String QUEUE_NAME = "queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);


            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination);


            //消费消息
            //1、接收TestMessage
            reveiveTestMessage(consumer);

            // 没有事务,下面提交会报错
            // session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void reveiveTestMessage(MessageConsumer consumer) throws JMSException {
        while (true) {
            //100s内阻塞等待消息的传入
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println(System.currentTimeMillis()+"收到消息" + message.getText());
            } else {
                break;
            }
        }
    }

}

2、分组Message

2.1、生产者


public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "FirstQueue";
    /**
     * 发送消息的数量
     */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            sendGroupMessage(session, producer);


            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


  
    private static void sendGroupMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("groupA--" + i);
            message.setStringProperty("JMSXGroupID", "GroupA");
            producer.send(message);

            TextMessage message2 = session.createTextMessage("groupB--" + i);
            message2.setStringProperty("JMSXGroupID", "GroupB");
            producer.send(message2);
        }
    }


}


2..2、消费者

2个消费者

public class Consumer {

    public static final String QUEUE_NAME = "FirstQueue";
    public static final Long WITE_TIME = (100L * 1000L);
    public static final int COUNT = 2;


    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            for (int i = 1; i <= COUNT; i++) {
                Destination destination = session.createQueue(QUEUE_NAME);
                //根据目的地获取一个消费者
                MessageConsumer consumer = session.createConsumer(destination);
                int size = i;
                consumer.setMessageListener(msg -> {
                    try {
                        TextMessage txtMsg = (TextMessage) msg;
                        System.out.println("消费者" + size + txtMsg.getText());
                    } catch (JMSException e) {
                    }
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

2..3、运行消费者,再运行生产者 ,发现消费者1 接收组A ,消费者2接收组B,自动分工

消费者1groupA--1
消费者2groupB--1
消费者1groupA--2
消费者2groupB--2
消费者1groupA--3
消费者2groupB--3
消费者1groupA--4
消费者2groupB--4
消费者1groupA--5
消费者2groupB--5

2..4、如果只创建一个组,两个消费者,则另一个消费者休息

private static void sendGroupMessage(Session session, MessageProducer producer) 
    throws JMSException {
    for (int i = 1; i <= SEND_NUMBER; i++) {
        TextMessage message = session.createTextMessage("groupA--" + i);
        message.setStringProperty("JMSXGroupID", "GroupA");
        producer.send(message);

    }
}
消费者1groupA--1
消费者1groupA--2
消费者1groupA--3
消费者1groupA--4
消费者1groupA--5

2..5、如果没有组,则两个消费者,都会消费

private static void sendGroupMessage(Session session, MessageProducer producer) 
    throws JMSException {
    for (int i = 1; i <= SEND_NUMBER; i++) {
        TextMessage message = session.createTextMessage("groupA--" + i);
        producer.send(message);
    }
}
消费者2groupA--1
消费者1groupA--2
消费者2groupA--3
消费者1groupA--4
消费者2groupA--5

3、Selector

3.1、生产者发送数据

public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "FirstQueue";
    /**
     * 发送消息的数量
     */
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("messageAA--" + i);
            message.setIntProperty("age", 23);
            producer.send(message);
        }
    }


}

3.2、消费者接收

public class Consumer {

    public static final String QUEUE_NAME = "FirstQueue";
    public static final Long WITE_TIME = (100L * 1000L);
    public static final int COUNT = 2;


    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);
            //根据目的地获取一个消费者
            MessageConsumer consumer = session.createConsumer(destination, "age>24");
            consumer.setMessageListener(msg -> {
                try {
                    TextMessage txtMsg = (TextMessage) msg;
                    System.out.println("消费者" + txtMsg.getText());
                } catch (JMSException e) {
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

3.3、测试运行,确实没有收到数据

ContactAuthor