首页  ·  知识 ·  大数据
RocketMQ消息队列单机部署及使用
网友  CSDN博客  消息队列  编辑:Cheryl   图片来源:网络
RocketMQ是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ里同样有这两个概念,消息生产者负责创建消息并发送到RocketMQ服务器,RocketMQ服务器会将消息持久化到磁

0 RocketMQ简介

0.1 介绍

RocketMQ是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ里同样有这两个概念,消息生产者负责创建消息并发送到RocketMQ服务器,RocketMQ服务器会将消息持久化到磁盘,消息消费者从RocketMQ服务器拉取消息并提交给应用消费。

0.2 特点

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

  • 支持严格的消息顺序

  • 支持Topic与Queue两种模式

  • 亿级消息堆积能力

  • 比较友好的分布式特性

  • 同时支持Push与Pull方式消费消息

  • 历经多次天猫双十一海量消息考验

0.3 部署结构

image.png

上图所示为RocketMQ的部署结构,图中Meta字样为RocketMQ早期代号。

1 RocketMQ 消息队列单机部署

1.1 系统配置环境

主机:Linux 
内存:8G 
硬盘:250G 
CPU:4核 

image.png

1.2 需要用到的软件包和文档

目前在Github上可下载最新的安装包alibaba-rocketmq-3.2.6.tar

下载地址:https://github.com/alibaba/RocketMQ

历史版本说明文档:Metaq原理与应用.docx

备注:RocketMQ早起在淘宝内部叫Metaq,去年改名为RocketMQ,不过该文档是针对历史版本的Metaq,仅供参考和熟悉一些概念。

1.3 服务器java环境

$java -version
java version "1.8.0_45"Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)12341234

1.4 rocketmq服务端安装

解压alibaba-rocketmq-3.2.6.tar

tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/11

配置rocketmq的环境变量,在/etc/profile最后添加

export ROCKETMQ_HOME=/opt/alibaba-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH1212


使rocketmq的环境变量生效

source /etc/profile11

给下列命令可执行权限

cd /opt/alibaba-rocketmq/bin/;
chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv1212


新建日志文件夹

cd /opt/alibaba-rocketmq
mkdir log1212


启动nameserver

nohup mqnamesrv 1>/opt/alibaba-rocketmq/log/ng.log 2>/opt/alibaba-rocketmq/log/ng-err.log &



查看启动状态


$ps aux|grep java

125233   12248 21.1  0.9 7151512 75844 pts/1   Sl   11:37   0:01 /opt/java/jdk1.8.0_45/bin/java -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/home/xiaolong.xiao/rmq_srv_gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -Djava.ext.dirs=/opt/alibaba-rocketmq/bin/../lib -cp .:/opt/alibaba-rocketmq/bin/../conf:.:/opt/java/jdk1.8.0_45/lib/dt.jar:/opt/java/jdk1.8.0_45/lib/tools.jar com.alibaba.rocketmq.namesrv.NamesrvStartup


验证nameserver是否启动


$tail -f /opt/alibaba-rocketmq/log/ng.log

The Name Server boot success.


启动broker,在启动borker之前需要指定nameserver地址,其中10.125.1.186为所在服务器IP


export NAMESRV_ADDR=10.125.1.186:9876

nohup mqbroker >/opt/alibaba-rocketmq/log/mq.log &


验证mqbroker是否启动


tail -f /opt/alibaba-rocketmq/log/mq.log


Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0

Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.

The broker[e010125001186.bja, 10.125.1.186:10911] boot success. and name server is 10.125.1.186:9876


最后配置防火墙 

nameserver端口为9876 

broker端口为10911


lokkit -p 9876:tcp -p 10911:tcp

1

1

关闭nameserver broker执行的命令


mqshutdown namesrv

mqshutdown broker


关闭nameserver


mqshutdown namesrv

The mqnamesrv(12248) is running...

Send shutdown request to mqnamesrv(12248) OK


关闭broker


$mqshutdown broker

The mqbroker(13634) is running...

Send shutdown request to mqbroker(13634) OK


安装成功显示结果: 

这里写图片描述


2 java客户端使用RocketMQ 消息队列


2.1 依赖配置


<!-- RocketMQ Java SDK -->

<dependency>

    <groupId>com.alibaba.rocketmq</groupId>

    <artifactId>rocketmq-client</artifactId>

    <version>3.2.6</version>

</dependency>


2.2 创建生产者


用来获取一个单例的生产者。


package com.autonavi.rocketmq.producer;


import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;


/**

 * @author dddd

 * @description 消息生产者

 * @date 2016-04-07

 */

public class Producer {


     /*

     * Constructs a client instance with your account for accessing DefaultMQProducer

     */

    private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    private static int initialState = 0;


    private Producer() {


    }


    public static DefaultMQProducer getDefaultMQProducer(){     

        if(producer == null){

            producer = new DefaultMQProducer("ProducerGroupName");          

        }


        if(initialState == 0){

            producer.setNamesrvAddr("100.125.1.186:9876");

            try {

                producer.start();

            } catch (MQClientException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

                return null;

            }


            initialState = 1;

        }


        return producer;        

    }


}


2.3 创建消费者


用来获取一个单例的消费者。消费者类似于直接操作数据库的对象,比如生产者下了订单订火车票,消费者就一直监听,有订单消息过来了,就去执行下订单操作。


package com.autonavi.rocketmq.consumer;


import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;


/**

 * @author dddd

 * @description 消息消费者

 * @date 2016-04-07

 */

public class Consumer {


     /*

     * Constructs a client instance with your account for accessing DefaultMQConsumer

     */

    private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

    private static int initialState = 0;


    private Consumer() {


    }


    public static DefaultMQPushConsumer getDefaultMQPushConsumer(){     

        if(consumer == null){

            consumer = new DefaultMQPushConsumer("ConsumerGroupName");          

        }


        if(initialState == 0){

            consumer.setNamesrvAddr("100.125.1.186:9876");

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            initialState = 1;

        }


        return consumer;        

    }


}


2.4 创建生产和消费服务


package com.autonavi.rocketmq.service;


import java.util.List;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQBrokerException;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.remoting.exception.RemotingException;

import com.autonavi.rocketmq.consumer.Consumer;

import com.autonavi.rocketmq.producer.Producer;


public class Test {


    private static final Logger logger = LoggerFactory.getLogger(Test.class);


    public static void main(String[] args){


        sendMsg();

    }


    public static void sendMsg(){


        // 获取消息生产者

        DefaultMQProducer producer = Producer.getDefaultMQProducer();


        try {

            for(int i=0;i<2000;i++){

                Message msg = new Message(

                        "TopicTest1",                   // topic

                        "TagA",                         // tag

                        "OrderID00"+i,                  // key

                        ("Hello MetaQ"+i).getBytes());  // body

                SendResult sendResult = producer.send(msg);

                //logger.info("sendResult:{}", sendResult);

            }           

        } catch (MQClientException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        } catch (RemotingException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        } catch (MQBrokerException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        } catch (InterruptedException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }


        producer.shutdown();

    }


    public static void receiveMsg(){


        // 获取消息生产者

        DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();


        // 订阅主体

        try {

            consumer.subscribe("TopicTest1", "*");


            consumer.registerMessageListener(new MessageListenerConcurrently() {


                /**

                 * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

                 */

                public ConsumeConcurrentlyStatus consumeMessage(

                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {


                    logger.info("currentThreadName:{} and Receive New Messages:{}",Thread.currentThread().getName(),msgs);


                    MessageExt msg = msgs.get(0);


                    if (msg.getTopic().equals("TopicTest1")) {

                        // 执行TopicTest1的消费逻辑

                        if (msg.getTags() != null && msg.getTags().equals("TagA")) {

                            // 执行TagA的消费

                            logger.info("MsgBody:{}",new String(msg.getBody()));

                        } else if (msg.getTags() != null

                                && msg.getTags().equals("TagC")) {

                            // 执行TagC的消费

                        } else if (msg.getTags() != null

                                && msg.getTags().equals("TagD")) {

                            // 执行TagD的消费

                        }

                    } else if (msg.getTopic().equals("TopicTest2")) {

                        // 执行TopicTest2的消费逻辑

                    }


                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                }           

            });


            /**

             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>

             */

            consumer.start();                      


            logger.info("Consumer Started.");

        } catch (MQClientException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }



2.5 测试效果


2.5.1 生产100个消息


2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C286, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=617]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C31B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=616]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C3B0, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=614]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C445, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=614]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C4DA, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=618]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C56F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=617]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C604, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=615]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C699, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=615]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C72E, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=619]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C7C3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=618]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C858, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=616]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C8EF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=616]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C986, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=620]

     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005CA1D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=619]

     ...

image.png

2.5.2 消费100个消息


2016-04-07-16-04 [main] [com.autonavi.rocketmq.service.Test] [INFO] - Consumer Started.

     2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_11 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=618, sysFlag=0, bornTimestamp=1460016115897, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115856, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005CDA7, commitLogOffset=380327, bodyCRC=901334138, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0019, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]

     2016-04-07-16-04 [ConsumeMessageThread_8] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_8 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=615, sysFlag=0, bornTimestamp=1460016115722, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115680, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C699, commitLogOffset=378521, bodyCRC=260218519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID007, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]

     2016-04-07-16-04 [ConsumeMessageThread_9] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_9 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=616, sysFlag=0, bornTimestamp=1460016115773, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115734, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C8EF, commitLogOffset=379119, bodyCRC=996330568, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0011, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]

     2016-04-07-16-04 [ConsumeMessageThread_3] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_3 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115669, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115629, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C445, commitLogOffset=377925, bodyCRC=149904014, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID003, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]

     2016-04-07-16-04 [ConsumeMessageThread_12] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_12 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=619, sysFlag=0, bornTimestamp=1460016115951, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115911, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005D003, commitLogOffset=380931, bodyCRC=2118254247, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0023, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]

     2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ19

     2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_1 and Receive New Messages:[MessageExt [queueId=1, storeSize=149, queueOffset=616, sysFlag=0, bornTimestamp=1460016115635, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115594, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C31B, commitLogOffset=377627, bodyCRC=1726036898, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID001, WAIT=true, MAX_OFFSET=641, MIN_OFFSET=0}, body=12]]]

     2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ1

     2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_18 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=625, sysFlag=0, bornTimestamp=1460016116319, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116278, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005DE2B, commitLogOffset=384555, bodyCRC=796302648, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0047, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]

     2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ47

     2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_4 and Receive New Messages:[MessageExt [queueId=2, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115648, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115608, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C3B0, commitLogOffset=377776, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID002, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]

     2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ2

     2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_20 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=627, sysFlag=0, bornTimestamp=1460016116436, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116393, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005E2E3, commitLogOffset=385763, bodyCRC=1482935637, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0055, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]

     2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ55

     2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_2 and Receive New Messages:[MessageExt [queueId=0, storeSize=149, queueOffset=617, sysFlag=0, bornTimestamp=1460016115587, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115577, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C286, commitLogOffset=377478, bodyCRC=300288820, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID000, WAIT=true, MAX_OFFSET=642, MIN_OFFSET=0}, body=12]]]

     2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ0



image.png

3 总结

本文仅供初学者学习如何使用RocketMQ,目前仅仅是单机配置,还没有涉及到集群等配置,后续会不断学习和记录。过程中有不对的地方欢迎吐槽呢。

4 参考文档

  • 《RocketMQ用户指南》

  • 《RocketMQ最佳实践》

  • 《RocketMQ原理简介》

  • 《分布式开放消息系统(RocketMQ)的原理与实践》


本文作者:网友 来源:CSDN博客
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读
也许感兴趣的
我们推荐的
主题最新
看看其它的