咸鱼开发修炼之路

RocketMQ安装使用

周末整了下RocketMQ在Ubuntu下的安装和使用,在此记录下。
rmq-feature-lowlatency.png

由于本人嫌麻烦未在自己的机器上安装linux双系统。。而是采用了在win10下安装linux子系统的方式来进行操作。win10下启用linux子系统详见 这里
环境:

1
2
3
4
Linux version ubuntu 18.04
RocketMQ 4.6.0
java 1.8
maven 3.6.0

安装java8

1
2
3
4
$ sudo apt-get install oracle-java8-installer
java -version(查看是否安装成功)
$ sudo update-alternatives --config java (通过这个命令可以看到java安装路径)
$ sudo vim /etc/environment

本人在操作时上面用apt-get直接装失败了。。所以去oracle官网获取下载链接后,
使用wget命令手动下载,解压到指定文件夹。

pasted-0.png

在文件最后添加上:

1
JAVA_HOME="/usr/lib/jvm/java-8-oracle"

执行下列命令

1
source /etc/environment

或重启系统生效

安装maven

1
$ sudo apt-get install maven

成功后可用mvn -version查看

设置mvn 国内镜像

$ sudo vim /etc/maven/settings.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    <mirror>  
        <id>alimaven</id>  
        <mirrorOf>central</mirrorOf>  
        <name>aliyun maven</name>  
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>  
    </mirror>  
  
    <mirror>  
        <id>ui</id>  
        <mirrorOf>central</mirrorOf>  
        <name>Human Readable Name for this Mirror.</name>  
        <url>http://uk.maven.org/maven2/</url>  
    </mirror>  
  
    <mirror>  
        <id>jboss-public-repository-group</id>  
        <mirrorOf>central</mirrorOf>  
        <name>JBoss Public Repository Group</name>  
        <url>http://repository.jboss.org/nexus/content/groups/public</url>  
    </mirror>

下载源代码及编译

本人采用git下载源码,可也去官网下载压缩包后解压

1
2
3
4
5
>cd /opt
> sudo git clone -b develop https://github.com/apache/rocketmq.git
> cd rocketmq
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/apache-rocketmq

pasted-1.png pasted-2.png

启动 name server

1
2
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
pasted-3.png

启动 broker

1
2
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
pasted-4.png

创建topic

pasted-5.png

创建group

pasted-6.png

但是broker的ip默认是第一块网卡的ip,如果网段是外网ip会导致消息发送失败
需要修改conf/broker.conf文件的配置

pasted-7.png

指定ip地址

重启broker

pasted-8.png

测试生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("GroupTest");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
producer.setVipChannelEnabled(false);
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("sendResult : %s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
pasted-9.png

测试生产者发送成功

测试消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupTest");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
pasted-10.png

消费者消费成功