rabbitmq相关知识

一、消息中间件基础知识

两种常见分布式架构

SOA架构:用Dubbo和Zookeeper进行服务间的远程通信。根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。Dubbo使用自定义的TCP协议,可以让请求报文体积更小,或者使用HTTP2协议,也可以减少报文 的体积,提高传输效率。

img

微服务架构:SpringCloud中使用Feign解决服务之间远程通信的问题,Feign是轻量级RESTful的HTTP服务客户端,广泛应用于Spring Cloud中。符合面向接口化的编程习惯。

本质:封装了HTTP调用流程,类似Dubbo的服务调用。RPC主要基于TCP/UDP协议,HTTP协议是应用层协议,是构建在传输层协议TCP之上的,RPC效率更高,RPC长连接:不必每次通信都像HTTP一样三次握手,减少网络开销; HTTP服务开发迭代更快:在接口不多,系统与系统之间交互比较少的情况下,HTTP就显得更加方便;相反,在接口比较多,系统与系统之间交互比较多的情况下,HTTP就没有RPC有优势。

img

分布式通信存在的问题及解决办法

电商项目中,如果后台添加商品信息,该信息放到数据库,我们同时,需要更新搜索引擎的倒排索引。

解决办法一:在后台添加商品的方法中,如果数据插入数据库成功,就调用更新倒排索引的方法, 接着调用更新静态化页面的方法,如果更新失败重试,(不推荐,更新失败容易出现死循环且高并发场景不适用)

解决办法二:先执行添加商品的方法,商品添加成功,将更新索引和更新静态页面的任务缓存到一 个公共的位置,然后由相应的服务从该位置获取任务来执行。比如使用redis,使用阻塞队列轮询异步执行。(单使用redis,不使用消息队列,无法确认消息,不推荐)

解决办法三:分布式异步通信模式,如下图。

img

优点:系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹性。服务解耦、流量削峰填谷等

缺点:消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。

使用异步消息模式需要注意的问题: 1. 哪些业务需要同步处理,哪些业务可以异步处理? 2. 如何保证消息的安全?消息是否会丢失,是否会重复? 3. 请求的延迟如何能够减少? 4. 消息接收的顺序是否会影响到业务流程的正常执行? 5. 消息处理失败后是否需要重发?如果重发如何保证幂等性?

幂等性:不管重发多少次,都要保证结果的一致性。

消息中间件概念

消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

消息中间件就是在通信的上下游之间截断:break it,Broker,然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。

img

常用主流消息中间件介绍

使用最为广泛的三款消息中间件:RabbitMQ、RocketMQ、Kafka。

RabbitMQ

RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。

优点: 1. 轻量级,快速,部署使用方便 2. 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。 3. RabbitMQ的客户端支持大多数的编程语言。

缺点: 1. 如果有大量消息堆积在队列中,性能会急剧下降 2. RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。 3. RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。

RocketMQ

RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。 RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。

RocketMQ几乎具备了消息队列应该具备的所有特性和功能。 java开发,阅读源代码、扩展、二次开发很方便。 对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。 性能比RabbitMQ高一个数量级,每秒处理几十万的消息。

缺点: 跟周边系统的整合和兼容不是很好。

Kafka

Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。 跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持 Kafka,Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息。 如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。 但是由于是异步的和批处理的,延迟也会高,不适合电商场景。

img

消息中间件应用场景

电商秒杀场景

当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?

在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发的写请求呢?

系统应该如何应对高并发的读请求:

使用缓存策略将请求挡在上层中的缓存中

能静态化的数据尽量做到静态化

加入限流(比如对短时间之内来自某一个用户,某一个IP、某个设备的重复请求做丢弃处理)

系统应该如何应对高并发的写请求:

生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s内,有 1 万个数据连接同时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用消息队列。

消息队列的作用:

削去秒杀场景下的峰值写流量——流量削峰(削去秒杀场景下的峰值写流量,将秒杀请求暂存于消息队列,业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去 处理其它用户的请求)

通过异步处理简化秒杀请求中的业务流程——异步处理(先处理主要的业务,异步处理次要的业务。 如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积分)

解耦,实现秒杀系统模块之间松耦合——解耦(实现秒杀系统模块之间松耦合,将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理)

B端C端数据同步场景

B端面向企业用户,C端面向求职者。这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。但是各自又需要对方的数据,需要共享:如 1. 当C端求职者在更新简历之后,B端企业用户如何尽早看到该简历更新? 2. 当B端企业用户发布新的职位需求后,C端用户如何尽早看到该职位信息?

如何解决B端C端数据共享的问题?

\1. 同步方式:B端和C端通过RPC或WebService的方式发布服务,让对方来调用,以获取对方的信息。求职者每更新一次简历,就调用一次B端的服务,进行数据的同步;B端企业用户每更新职位需求,就调用C端的服务,进行数据的同步。

\2. 异步方式:使用消息队列,B端将更新的数据发布到消息队列,C端将更新的数据发布到消息队列,B端订阅C端的消息队列,C端订阅B端的消息队列。

使用同步方式,B端和C端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可用。使用消息队列的异步方式,对B端C端进行解耦,只要消息队列可用,双方都可以将需要同步的信息 发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓存数据。

JMS规范

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送 消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。 它类似于JDBC(Java Database Connectivity)。消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。

根据有效负载的类型来划分,可以将消息分为几种类型: 1. 简单文本(TextMessage) 2. 可序列化的对象(ObjectMessage) 3. 属性集合(MapMessage) 4. 字节流(BytesMessage) 5. 原始值流(StreamMessage) 6. 无有效负载的消息(Message)。

对象模型:ConnectionFactory 接口(连接工厂)、Connection 接口(连接)、Destination 接口(目标)、Session 接口(会话)、MessageConsumer 接口(消息消费者)、MessageProducer 接口(消息生产者)、Message 接口(消息)

img

点对点模式

一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为: 一条消息只有一个消费者获得 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行 状态。 每一个成功处理的消息要么自动确认,要么由接收者手动确认。

发布/订阅模式

支持向一个特定的主题发布消息。0或多个订阅者可能对接收特定消息主题的消息感兴趣。发布者和订阅者彼此不知道对方。多个消费者可以获得消息。

在发布者和订阅者之间存在时间依赖性。 发布者需要建立一个主题,以便客户能够订阅。 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。

JMS在应用集群中的问题

点对点和发布订阅模式在集群下都存在问题,点对点浪费空间,发布订阅对业务侵入较大,ActiveMQ通过“虚拟主题”解决了这个问题。

JMS规范文档(jms-1_1-fr-spec.pdf)下载地址: https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/

AMQP协议

AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0,AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。

img

Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。

Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个 queue中接收消息。 Server:一个具体的MQ服务实例,也称为Broker。

Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。

Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。

Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通 常需要和具体的Exchange类型、Binding的Routing key结合起来使用

Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer。

AMQP 使用的数据类型如下:

Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。 Bits(统一为8个字节):用于表示开/关值。

Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节

Long strings:用于保存二进制数据块。

Field tables:包含键值对,字段值一般为字符串,整数等。

AMQP协议文档下载地址: https://www.amqp.org/sites/amqp.org/files/amqp0-9-1.zip

二、RabbitMQ架构

RabbitMQ概念及基本架构

RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件。RabbitMQ具有很强大的插件扩展能力,并具备以下特点:

\1. 高可靠性、易扩展、高可用、功能丰富等

\2. 支持大多数(甚至冷门)的编程语言客户端。

\3. RabbitMQ遵循AMQP协议,自身采用Erlang

\4. RabbitMQ也支持MQTT等其他协议。

RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。

Fanout 会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

Direct direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。

Topic topic类型的交换器在direct匹配规则上进行了扩展,可以存在两种特殊字符“*”和 “#”,用于模糊匹配。

Headers headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的 headers属性进行匹配。

RabbitMQ消息有两种类型: 1. 持久化消息和非持久化消息。 2. 这两种消息都会被写入磁盘。

持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。

非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

RabbitMQ存储层包含两个部分:队列索引和消息存储。

队列索引:rabbit_queue_index 索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。

消息存储:rabbit_msg_store 消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一 个。存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存 储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。

队列结构 通常队列由rabbit_amqqueue_process和backing_queue这两部分组成, rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消 息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形 式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

为什么消息的堆积导致性能下降?

在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息 的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继 而情况变得越来越恶化,使得系统的处理能力大大降低。

安装和配置RabbitMQ

第一步:安装依赖

yum install socat -y

第二步:安装Erlang

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm –force –nodeps

第三步:安装RabbitMQ

rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm –force –nodeps

第四步:启用RabbitMQ的管理插件

cd ../usr/lib/rabbitmq

rabbitmq-plugins enable rabbitmq_management

img

第五步:启动RabbitMQ

systemctl start rabbitmq-server 前台启动

rabbitmq-server -detached 后台启动

第六步:添加用户

rabbitmqctl add_user root 123456

第七步:给用户添加权限

rabbitmqctl set_permissions root -p / “.“ “.“ “.*”

第八步: 给用户设置标签

rabbitmqctl set_user_tags root administrator

第九步:打开浏览器,登录客户端

img

RabbitMQ常用操作命令

# 前台启动Erlang VM和RabbitMQ

rabbitmq-server

# 后台启动

rabbitmq-server -detached

# 停止RabbitMQ和Erlang VM

rabbitmqctl stop

# 查看所有队列

rabbitmqctl list_queues

# 查看所有虚拟主机

rabbitmqctl list_vhosts

# 在Erlang VM运行的情况下启动RabbitMQ应用

rabbitmqctl start_app rabbitmqctl stop_app

# 查看节点状态

rabbitmqctl status

# 查看所有可用的插件

rabbitmq-plugins list

# 启用插件

rabbitmq-plugins enable

# 停用插件

rabbitmq-plugins disable

# 添加用户

rabbitmqctl add_user username password

# 列出所有用户:

rabbitmqctl list_users

# 删除用户:

rabbitmqctl delete_user username

# 清除用户权限: r

abbitmqctl clear_permissions -p vhostpath username

# 列出用户权限:

rabbitmqctl list_user_permissions username

# 修改密码:

rabbitmqctl change_password username newpassword

# 设置用户权限:

rabbitmqctl set_permissions -p vhostpath username “.“ “.“ “.*”

# 创建虚拟主机:

rabbitmqctl add_vhost vhostpath

# 列出所以虚拟主机:

rabbitmqctl list_vhosts

# 列出虚拟主机上的所有权限:

rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机:

rabbitmqctl delete_vhost vhost vhostpath

# 移除所有数据,要在 rabbitmqctl stop_app 之后使用:

rabbitmqctl reset

RabbitMQ工作流程详解

生产者发送消息的流程

\1. 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)

\2. 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等

\3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

\4. 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来

\5. 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息

\6. 相应的交换器根据接收到的 routingKey 查找相匹配的队列。

\7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。

\8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

\9. 关闭信道。

\10. 关闭连接。

消费者接收消息的过程

\1. 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。

\2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及 做一些准备工作 3. 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。

\4. 消费者确认( ack) 接收到的消息。

\5. RabbitMQ 从队列中删除相应己经被确认的消息。

\6. 关闭信道。

\7. 关闭连接。

Connection 和Channel关系

生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。

为什么不直接使用TCP连接,而是使用信道?

RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。 当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个 Connection ,分摊信道。

img

RabbitMQ工作模式详解

工作队列模式Work Queue

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

结果:生产者生产消息发送给交换器,交换器向绑定他的消费者分发消息,负载均衡。

img

发布订阅模式fanout

使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。消息广播给所有订阅该消息的消费者。生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器的类型: direct 、 topic 、 headers 和 fanout 四种类型。发布订阅使 用fanout。不指定交换器会使用默认交换器default。

实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。 实现推模式推荐的方式 是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。

结果:生产者往交换器发送消息,消费者会声明一个临时队列,绑定到交换器,当消息过来,交换器会复制N份发送给订阅的消费者。实现将消息广播到很多接收者。

img

路由模式direct

使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费,通过路由模式实现让接收者只接收部分消息。

结果:生产者往交换器发送消息,消费者根据key绑定交换器,交换器根据匹配路由和key推送消息。实现通过 direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

img

主题模式topic

使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。

结果:生产者往交换器发送消息,消费者根据key绑定交换器,通过绑定器里面的通配符让交换器发送消息时进行分类发送,最后达到定制分发效果。

img

SpringBoot整合RabbitMQ

\1. 添加starter依赖

org.springframework.boot spring-boot-starter-amqp

\2. application.properties中添加连接信息

spring.application.name=springboot_rabbit
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

\3. 主入口类

img

4.RabbitConfig类

img

5.使用RestController发送消息

img

6.使用监听器,用于推消息

img

三、RabbitMQ特性

消息可靠性问题

支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。 支付平台通过如下几种方式保证数据一致性:

\1. 分布式锁,用redis或zookeeper等常用框架来实现。 比如我们在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等 待上一个操作的锁释放后再依次执行。

优点:能够保证数据强一致性。 缺点:高并发场景下可能有性能问题。

\2. 消息队列,保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消 息并消费处理完成后,客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收 到ack消息,则定时去重发消息。

优点:异步、高并发 缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成 功完成,不可能失败。

消息可靠性解决之异常捕获机制

先执行业务操作,业务操作成功后执行消息发送,消息发送过程通过try catch 方式捕获异常, 在异常处理理的代码块中执行回滚业务操作或者执行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

img

消息可靠性解决之AMQP/RabbitMQ的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。一直到事务提交后都没有异常,确实就说明消息是投递成功了。这种方式在性能方面的开销比较大,不推荐使用。

img

消息可靠性解决之发送端确认机制

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。

img

消息可靠性解决之持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

\1. Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不丢失。

\2. Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不丢失。

3.消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。

RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不足时,非持久化的消息也会被刷盘处理),这些处理动作都是在“持久层”中完成的。

\1. 队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括 消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应 的rabbit_queue_index。

\2. 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列 共享,在每个节点中有且只有一个。

消息可靠性解决之Consumer ACK

如何保证消息被消费者成功消费?

生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我 们又没有任何重试,那结果跟消息丢失没什么分别。 因此RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。

\1. 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表, 再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险

\2. 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新 被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期

\3. 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回 Ack

SpringBoot项目中支持如下的一些配置:

#最大重试次数

spring.rabbitmq.listener.simple.retry.max-attempts=5

#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时)

spring.rabbitmq.listener.simple.retry.enabled=true

#重试间隔时间(单位毫秒)

spring.rabbitmq.listener.simple.retry.initial-interval=5000

#重试超过最大次数后是否拒绝

spring.rabbitmq.listener.simple.default-requeue-rejected=false

#ack模式

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息可靠性解决之消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一 定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩 溃,而分布式系统的故障往往会发生上下游传递,产生连锁反应。

\1. RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直 到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。

\2. RabbitMQ 还默认提供了一种基于credit flow 的流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。

\3. RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果 超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是 QoS机制仅对于消费端推模式有效,对拉模式无效。

提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:

\1. 优化应用程序的性能,缩短响应时间(需要时间)

\2. 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)

\3. 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)

消息可靠性保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障 分为三个层级:

\1. At most once:最多一次。消息可能会丢失,但绝不会重复传输

\2. At least once:最少一次。消息绝不会丢失,但可能会重复传输

\3. Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次

RabbitMQ 支持其中的“最多一次”和“最少一次”。

其中“最少一次”投递实现需要考虑以下这个几个方面的内容:

\1. 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。

\2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。

\3. 消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。

\4. 消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确 保消息不会丢失。

消息幂等性处理

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方 法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。 对于幂等的方法,不用担心重复执行会对系统造成任何改变。

对于幂等性的一些常见做法:

\1. 借助数据库唯一索引,重复插入直接报错,事务回滚。

\2. 前置检查机制。为了防止并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件。

\3. 唯一Id机制,比较通用的方式。

消息可靠性分析

在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。 在RabbitMQ 中可以使用Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费 消息的记录,方便RabbitMQ 的使用者进行调试、排错等。Firehose 的原理是将生产者投递给RabbitMQ 的消息,或者RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的交换器上。

开启Firehose命令: rabbitmqctl trace_on [-p vhost]

关闭命令为:rabbitmqctl trace_off [-p vhost]

Firehose 默认情况下处于关闭状态,并且Firehose 的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose 开启之后多少会影响RabbitMQ 整体服务性能,因为它会引起额 外的消息生成、路由和存储。

TTL机制

在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内 用户没有支付,则默认订单取消。

常用实现办法:

\1. 定期轮询检查(数据库等)

\2. Timer定时器

\3. ScheduledExecutorService多线程

\4. RabbitMQ消息队列

TTL,Time to Live 的简称,即过期时间。 RabbitMQ 可以对消息和队列两个维度来设置TTL。

两种方法可以设置消息的TTL:

\1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。

\2. 对消息自身进行单独设置,每条消息的TTL可以不同。

默认规则:

\1. 如果不设置TTL,则表示此消息不会过期;

\2. 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL “.*” ‘{“message-ttl”:30000}’ –apply-to queues

死信队列

用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统采用 MQ异步通讯。在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消 息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。 DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter) 之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。

以下几种情况导致消息变为死信:

\1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;

\2. 消息过期;

\3. 队列达到最大长度。

延迟队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

\1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上

\2. 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列 (queue)并把消息给它 3. 队列(queue)再把消息发送给监听它的消费者(customer)

img

一、RocketMQ架构

RocketMQ使用场景

\1. 应用解耦:系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、 物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

\2. 流量削峰:缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

\3. 数据分发:通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需 要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

RocketMQ 部署架构

RocketMQ的角色:

Producer:消息的发送者;举例:发信者

Consumer:消息接收者;举例:收信者

Broker:暂存和传输消息;举例:邮局

NameServer:管理Broker;举例:各个邮局的管理机构

Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者 可以订阅一个或者多个Topic消息

Message Queue:相当于是Topic的分区;用于并行发送和接收消息

img

执行流程:

\1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

\2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic 跟Broker的映射关系。

\3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

\4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从 NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列, 然后与队列所在的Broker建立长连接从而向Broker发消息。

\5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在 哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

RocketMQ特性

\1. 订阅与发布:消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息。

\2. 消息顺序:消息有序指的是一类消息消费时,能按照发送的顺序来消费。RocketMQ可以严格的保证消息有序。

\3. 消息过滤:RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。

\4. 消息可靠性:RocketMQ支持消息的高可靠,影响消息可靠性的几种情况: 1)Broker非正常关闭 2)Broker异常 Crash 3)OS Crash 4)机器掉电,但是能立即恢复供电情况 5)机器无法开机(可能是cpu、主板、内存等 关键设备损坏) 6)磁盘设备损坏,RocketMQ通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,但是性能会下降。

\5. 至少一次:指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。

\6. 回溯消费:回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能, Broker在向Consumer投递成功消息后,消息仍然需要保留。

\7. 事务消息:RocketMQ事务消息是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。

\8. 定时消息:定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的 topic。

\9. 消息重试:Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。

\10. 消息重投:消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是 无法避免的问题。

\11. 流量控制:生产者流控,因为broker处理能力达到瓶颈,不会尝试消息重投;消费者流控,因为消费能力达到瓶颈。

\12. 死信队列:死信队列用于处理无法被正常消费的消息。 当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

消费模式Push or Pull

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQ Server拉取。但在具体实现时,Push和Pull模式本质都是采用消费端主动拉取的方式,即consumer轮询从 broker拉取消息。RocketMQ使用长轮询机制来模拟Push效果,算是兼顾了二者的优点。

Push模式

实时性高,但是消费端的处理能力有限,当瞬间推送很多消息给消费端时,容易造成消费端的消息积压,严重时会压垮客户端,Push方式里,consumer把长轮询的动作封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

Pull模式

主动权掌握在消费端自己手中,根据自己的处理能力量力而行。但是Pull的频率,定时间隔太久担心影响时效性,间隔太短担心做太多“无用功”浪费资源。比较折中的办法就是长轮询。Pull方式里,取消息的过程需要用户自己主动调用,首先通过打算消费的Topic拿到 MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

RocketMQ核心概念

\1. 消息模型:RocketMQ主要由Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息, Consumer 负责消费消息,Broker 负责存储消息。

\2. Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。

\3. Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

\4. PushConsumer:Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端。应用通常向 Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法。该 消费模式一般实时性较高。

\5. PullConsumer:Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、 主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

\6. ProducerGroup:同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息 且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消 费。

\7. ConsumerGroup:同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在 消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订 阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费和广播消费。

\8. Broker:消息中转角色,负责存储消息,转发消息,一般也称为 Server。

\9. 一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。

\10. 集群消费:一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其 中一个 Consumer Group 有 3 个实例,那举每个实例只消费其中的 3 条消息。

\11. 顺序消息:消费消息的顺序要同发送消息的顺序一致,在RocketMQ 中主要指的是局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer 就可以按照 Producer发送的顺序去消费消息

\12. 普通顺序消息:顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启, 由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。

\13. 严格顺序消息:顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover特性,即Broker集 群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。

\14. Message Queue:在 RocketMQ 中,所有消息队列都是持久化的,长度无限的数据结构,所谓长度无限是指队列中 的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset 为 java long 类型,64 位, 理论上在 100 年内不会溢出,所以认为为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。

\15. 标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不 同业务目的在同一主题下设置不同标签。

RocketMQ环境搭建

第一步:安装unzip,解压zip

yum install -y unzip zip

第二步:下载rocket包并解压

wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all4.5.1-bin-release.zip

unzip unzip rocketmq-all-4.5.1-bin-release.zip

第三步:环境变量配置,配套jdk8以上

vim /etc/profile #修改配置

export ROCKET_HOME=/usr/local/rocketmq/rocket

export PATH=$PATH:$ROCKET_HOME/bin

source /etc/profile #生效

第四步:修改启动、关闭配置文件,更改占用内存 64m 128m

vim bin/runserver.sh

vim bin/runbroker.sh

vim conf/broker.conf

第五步:启动NameServer

sh bin/mqnamesrv -n 117.50.5.252:9876 &

第六步:启动Broker

sh bin/mqbroker -n 117.50.5.252:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq/rocket/conf/broker.conf &

第七步:停止命令

mqshutdown borker

mqshutdown namesrv

img

二、RocketMQ特性

消息发送机制

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、OneWay发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:

1)设置Producer的GroupName。

2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。

3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。

4)设置NameServer地址

5)组装消息并发送。

提升写入的性能 发送一条消息出去要经过三步:

\1. 客户端发送请求到服务器。 2. 服务器处理该请求。 3. 服务器向客户端返回应答

Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。

另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中,写入性能达到90万+的TPS。

消息消费机制

消费的几个要点:

\1. 消息消费方式(Pull和Push)

\2. 消息消费的模式(广播模式和集群模式)

\3. 流量控制(可以结合sentinel来实现)

\4. 并发线程数设置

\5. 消息的过滤(Tag、Key) TagA||TagB||TagC * null

三种提高Consumer的处理能力的方法:

\1. 提高消费并行度,在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提 高并行度。 通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。

\2. 以批量方式进行消费,某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中,涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。

\3. 检测延时情况,跳过非重要消息,Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆 积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。

消息存储机制

消息存储

目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。 但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍! 因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。 RocketMQ的消息用顺序写,保证了消息存储的速度。

存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件 是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储 的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消 息内容不是定长的。

2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能。

3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

消息过滤机制

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。 RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容。

Tag过滤方式:

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多 个TAG,可以用||分隔。

\1. Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给 Broker端。

\2. Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给Store。

\3. Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤。

\4. 在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉 取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消 息消费。

SQL92的过滤方式:

仅对push的消费者起作用。 Tag方式虽然效率高,但是支持的过滤逻辑比较简单。 SQL表达式可以更加灵活的支持复杂过滤逻辑。

\1. 数字比较: >, >=, <=, BETWEEN, =

\2. 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;

\3. 逻辑比较: AND, OR, NOT;

\4. Constant types are: 数字如:123, 3.1415; 字符串如:’abc’,必须是单引号引起来 NULL,特 殊常量 布尔型如:TRUE or FALSE;

零拷贝原理

cache和buffer的区别

Cache:缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因 为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而 Cache 保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了 CPU等待的时间,提高了系统的性能。

Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer 可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时, 存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此 期间存储快的设备CPU可以干其他的事情。

HeapByteBuffer和DirectByteBuffer

HeapByteBuffer,是在jvm堆上面一个buffer,底层的本质是一个数组,用类封装维护了很多的 索引(limit/position/capacity等)。

DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是jvm里,DirectByteBuffer里维 护了一个引用address指向数据,进而操作数据。

HeapByteBuffer优点:内容维护在jvm里,把内容写进buffer里速度快;更容易回收。

DirectByteBuffer优点:跟外设(IO设备)打交道时会快很多,因为外设读取jvm堆里的数据时, 不是直接读取的,而是把jvm里的数据读到一个内存块里,再在这个块里读取的,如果使用 DirectByteBuffer,则可以省去这一步,实现zero copy(零拷贝)

缓冲IO和直接IO

缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。

\1. 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全;

\2. 可以减少读盘的次数,从而提高性能。

缓存I/O数据在传输过程中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作, 这些数据拷贝操作所带来的CPU以及内存开销是非常大的。

直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓 冲区到用户程序缓存的数据复制。

如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。

总结

\1. 虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA技术,就无需从PageCache拷贝至 Socket 缓冲区;

\2. 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;

\3. Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、 FileChannel.map()等;

\4. Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝特性。

同步复制和异步复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

同步复制:

同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态; 在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会 增大数据写入延迟,降低系统吞吐量。

异步复制:

异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因 为没有被写 入Slave,有可能会丢失;

同步复制和异步复制是通过broker.conf 配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

通常情况下,应该把Master和Save配置成ASYNC_FLUSH的 刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢。

img

img

高可用机制

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。

img

消息消费高可用:在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。

消息发送高可用:在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上,这样既可以在性能方面具有扩展性,也可以降低主节点故障 对整体上带来的影响,而且当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer 仍然可以发送消息的。

img

在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息,对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。 在这种复制模式下,严格顺序和高可用只能选择一个。RocketMQ 在 2018 年底迎来了一次重大的更新,引入 Dledger,增加了一种全新的复制方式解决了这个问题。

刷盘机制

RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘 都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

同步刷盘和异步刷盘差异:

同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下: (1). 写入 PageCache后,线程等待,通知刷盘线程刷盘。 (2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。 (3). 前端等待线程向用户返回成功

负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

Producer的负载均衡:

img

Consumer的负载均衡:

img

在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处 获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。 Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息 时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。 DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是 registerMessageQueueListener函数,一个是MQPullConsumerScheduleService。

DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个 DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个 ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发 doRebalance动作。

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在 同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

消息重试

顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应 用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回 状态达到消息重试的结果。无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消 息不再重试,继续消费新的消息。

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

img

如果消息重试 16 次后仍然失败,消息将不再投递。

注意:

1) 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。

2) 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。

3) 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

死信队列

RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信 消息的特殊队列称为死信队列(Dead-Letter Queue)。

可视化工具:rocketmq-console下载地址:

https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip

死信消息特性:

1) 不会再被消费者正常消费。

2) 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列特征:

1) 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。

2) 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。

3) 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

延迟消息

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的 topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。

level有以下三种情况:

level == 0,消息为非延迟消息

1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s

level > maxLevel,则level== maxLevel,例如level==20,延迟2h

发消息时,设置delayLevel等级即可: msg.setDelayLevel(level)。

顺序消息

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生 成、付款、发货,这3个消息必须按顺序处理才行。

顺序消息分为全局顺序消息和部分顺序消息:

\1. 全局顺序消息指某个Topic下的所有消息都要保证顺序;

\2. 部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可。

要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送 到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题。

要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。

事务消息

RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。

具体流程如下:

1)发送方向RocketMQ发送“待确认”消息。

2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。

3)发送方开始执行本地事件逻辑。

4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息, RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到 Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。

5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段 后将对“待确认”消息发起回查请求。

6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到 和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit 或Roolback状态。

7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。

RocketMQ事务消息流程概要

事务消息发送及提交:(1) 发送消息(half消息)。 (2) 服务端响应消息写入结果。 (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。 (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可 见)

事务消息的补偿流程:(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查” (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback

补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

RocketMQ事务消息设计

\1. 事务消息在一阶段对用户不可见

\2. Commit和Rollback操作以及Op消息的引入

\3. Op消息的存储和对应关系,Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到 Half消息进行后续的回查操作。

\4. Half消息的索引构建,在执行二阶段Commit操作时,需要构建出Half消息的索引。

\5. 处理二阶段失败的消息,如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致 Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制, 称为“回查”。

img

消息查询及优先级

消息查询

RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。

按照MessageId查询消息:MsgId 总共 16 字节,包含消息存储主机地址(ip/port),消息 Commit Log offset。

按照Message Key查询消息:主要是基于RocketMQ的IndexFile索引文件来实现的。

img

消息优先级

有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。

1) 多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个 Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic。创建一个 Topic, 设置Topic的 MessageQueue 数量超过 100 个,Producer根据订 单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用 循环的方式逐个读取一个 Topic 的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对 应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。

2) 情况和第一种情况类似,但是不用创建大量的Topic。

3) 强制优先级 TypeA、 TypeB、 TypeC 三类消息 。TypeA 处于第一优先级,要确保只要有TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第三优先级 。

底层网络通信 – Netty

RocketMQ底层通信的实现是在Remoting模块里,因为借助了Netty而没有重复造轮子, RocketMQ的通信部分没有很多的代码,就是用Netty实现了一个自定义协议的客户端/服务器程序。

\1. 自定义ByteBuf可以从底层解决ByteBuffer的一些问题,并且通过“内存池”的设计来提升性能

\2. Reactor主从多线程模型

\3. 充分利用了零拷贝,CAS/volatite高效并发编程特性

\4. 无锁串行化设计

\5. 管道责任链的编程模型

\6. 高性能序列化框架的支持

\7. 灵活配置TCP协议参数

img

RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。

img

限流机制

RocketMQ消费端中我们可以:

\1. 设置最大消费线程数 2. 每次拉取消息条数等

同时:

\1. PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度, 2. 任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。

Sentinel 专门为这种场景提供了匀速器的特性,可以把突然到来的大量请求以匀速的形式均摊,以 固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长 的时候则直接拒绝,而不是拒绝全部请求。比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多 余的处理任务将排队;同时设置了超时时间为 5 s,预计排队时长超过 5s 的处理任务将会直接被拒绝。

三、RocketMQ高级实战

生产者

Tags的使用:一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只 有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤: message.setTags(“TagA”)。

Keys的使用:每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每 个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由 于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

日志的打印:

1) SEND_OK:消息发送成功。

2) FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。

3) FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时。

4) SLAVE_NOT_AVAILABLE:消息发送成功,但是此时Slave不可用。

消息发送失败处理方式:Producer的send方法本身支持内部重试,至多重试2次,如果发送失败,则轮转到下一个Broker,如果本身向broker发送消息产生超时异常,就不会再重试。

选择oneway形式发送:oneway形式只发送请求 不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端 的socket缓冲区,此过程耗时通常在微秒级。

消费者

消费过程幂等:RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理,可以借助关系数据库进行去重。

消费速度慢的处理方:1. 提高消费并行度,2. 批量方式消费,3. 跳过非重要消息。

优化每条消息消费过程:把循环多次处理变为批量单次处理,减少IO次数。

消费打印日志:在消费入口方法打印消息,消费耗时等,方便后续排查问题。

其他消费建议:1. 确保同一组内的每个消费者订阅信息保持一致。2.使用有序消息,消费者将锁定每个消息队列,以确保他们被逐个消费。3. 并发消费不建议抛出异常,直接返回状态码。4. 不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程。

Broker

Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从 机)。SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠, 所以需要根据实际的业务场景做好权衡。

img

NameServer

NameServer的设计:

\1. NameServer互相独立,彼此没有通信关系,单台NameServer挂掉,不影响其他 NameServer。

\2. NameServer不去连接别的机器,不主动推消息。

\3. 单个Broker(Master、Slave)与所有NameServer进行定时注册,以便告知NameServer自 己还活着。

\4. Consumer随机与一个NameServer建立长连接,如果该NameServer断开,则从 NameServer列表中查找下一个进行连接。

\5. Producer随机与一个NameServer建立长连接,每隔30秒(此处时间可配置)从 NameServer获取Topic的最新队列情况,如果某个Broker Master宕机,Producer最多30秒 才能感知,在这个期间,发往该broker master的消息失败。Producer向提供Topic服务的 Master建立长连接,且定时向Master发送心跳。

RocketMQ为什么不使用ZooKeeper而自己开发NameServer?

zookeeper在粗粒度分布式锁,分布式选主,主备高可用切换等不需要高TPS支持的场景下有不可替代的作用,而这些需求往往多集中在大数据、离线任务等相关的业务领域,因为大数据领域,讲究分割数据集,并且大部分时间分任务多进程/线程并行处理这些数据集,但是总是有一些点上需要将这些任务和进程统一协调,这时候就是ZooKeeper发挥巨大作用的用武之地。 但是在交易场景交易链路上,在主业务数据存取,大规模服务发现、大规模健康监测等方面有天然 的短板,应该竭力避免在这些场景下引入ZooKeeper,在阿里巴巴的生产实践中,应用对ZooKeeper申 请使用的时候要进行严格的场景、容量、SLA需求的评估。

系统配置

设置Xms和Xmx一样大,防止JVM重新调整堆空间大小影响性能。

-server -Xms8g -Xmx8g -Xmn4g

设置DirectByteBuffer内存大小。当DirectByteBuffer占用达到这个值,就会触发Full GC。

-XX:MaxDirectMemorySize=15g

如果不太关心RocketMQ的启动时间,可以设置pre-touch,这样在JVM启动的时候就会分配完整的页空间。

-XX:+AlwaysPreTouch

禁用偏向锁可能减少JVM的停顿,在并发小的时候使用偏向锁有利于提升JVM效率,在高并发场合禁用掉。

-XX:-UseBiasedLocking

推荐使用JDK1.8的G1垃圾回收器。

RocketMQ集群

Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master 发送心跳。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向 Master、Slave发送心跳。

单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。

多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性 会受到影响。

多Master多Slave模式(异步)

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟 (毫秒级),即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,但是会丢失少量消息。

多Master多Slave模式(异步)

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功, 才向应用返回成功,消息无延迟,服务可用性与数据可用 性都非常高;但是性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版 本在主节点宕机后,备机不能自动切换为主机。

------ 本文结束感谢您的阅读 ------
请我一杯咖啡吧!
itingyu 微信打赏 微信打赏