1.消息队列 Message Queue,简称 MQ。是一种应用间的通信方式,主要由三个部分组成:
- 1.1 生产者:Producer:消息的产生者与调用端主要负责消息所承载的业务信息的实例化是一个队列的发起方
- 1.2 代理:Broker: 主要的处理单元,负责消息的存储、投递、及各种队列附加功能的实现,是消息队列最核心的组成部分
- 1.3 消费者:Consumer:一个消息队列的终端也是消息的调用端具体是根据消息承载的信息,处理各种业务逻辑。
- 消息队列的应用场景较多,常用的可以分为三种:
- 2.1 异步处理: 主要应用于对实时性要求不严格的场景,比如用户注册发送验证码、下单通知、发送优惠券等等
- 2.2 应用解耦 :把相关但耦合度不高的系统联系起来。 有关联但不哪么紧密的系统,,每个系统之间只需要把约定的消息发送到 MQ,另外的系统去消费即可。解决了各个系统可以采用不同的架构、语言来实现,从而大大增加了系统的灵活性。
- 2.3 流量削峰 : 在大流量入口且短时间内业务需求处理不完的服务中心,为了权衡高可用,把大量的并行任务发送到 MQ 中,依据 MQ 的存储及分发功能,平稳的处理后续的业务,起到一个大流量缓冲的作用。
- 常见的消息队列中间件主要有: ActiveMQ、RabbitMQ、Kafka、RocketMQ 这几种,
在架构技术选型的时候一般根据业务的需求选择合适的中间件:比如中小型公司,低吞吐量的一般用 ActiveMQ、RabbitMQ 较为合适,大数据高吞吐量的大型公司一般选用 Kafka 和 RocketMQ。
- 应用把磁盘中的某个文件内容发送到远程服务器上:
- 磁盘 - 内核 buffer - 用户 buffer - socket buffer - 网卡缓冲区 - 目标服务器
- 零拷贝通过DMA(Direct Memory Access)技术把文件内容复制到内核空间中的ReadBuffer,接着把包含数据位置和长度信息的文件描述符加载到 Socket Buffer 中,DMA 引擎直接可以把数据从内核空间中传递给网卡设备。
- 所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。
- 程序中如何实现零拷贝:
- 4.1 Linux 中,零拷贝技术依赖于底层的 sendfile()方法实现
- 4.2 Java 中,FileChannal.transferTo() 方法的底层实现就是 sendfile() 方法。
- 4.3 还有一个 mmap 的文件映射机制,原理是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销
- Producer 端,需要确保消息能够到达 Broker 并实现消息存储:
- 1.1 Producer 默认是异步发送消息,这种情况下要确保消息发送成功,有两个方法
- a. 把异步发送改成同步发送,这样 producer 就能实时知道消息发送的结果。
- b. 添加异步回调函数来监听消息发送的结果,如果发送失败,可以在回调中重试。
- 1.2 Producer 本身提供了一个重试参数 retries,如果因为网络问题或者 Broker 故障导致发送失败,Producer 会自动重试。
- Broker 端,Broker 需要确保 Producer 发送过来的消息不会丢失,也就是只需要把消息持久化到磁盘就可以了.
- 2.1 但是,Kafka 为了提升性能,采用了异步批量刷盘的实现机制,也就是说按照一定的消息量和时间间隔来刷盘,而最终刷新到磁盘的这个动作,是由操作系统来调度的,所以如果在刷盘之前系统崩溃,就会导致数据丢失。
- 2.2 Kafka 并没有提供同步刷盘的实现,所以针对这个问题,需要通过 Partition的副本机制和 acks 机制来一起解决。
- 2.3 Partition 副本机制,它是针对每个数据分区的高可用策略,每个partition 副本集包含唯一的一个 Leader 和多个 Follower,Leader 专门处理事务类的请求,Follower 负责同步 Leader 的数据”。在这样的一种机制的基础上,kafka 提供了一个 acks 的参数,Producer 可以设置 acks参数再结合 Broker 的副本机制来个共同保障数据的可靠性.
- 2.4 acks 有几个值的选择:
- a. acks=0, 表示 producer 不需要等 Broker 的响应,就认为消息发送成功,这种情况会存在消息丢失。
- b. acks=1,表示 Broker 中的 Leader Partition 收到消息以后,不等待其他 FollowerPartition 同步完,就给 Producer 返回确认,这种情况下 Leader Partition 挂了,会存在数据丢失。
- c. acks=-1,表示 Broker 中的 Leader Parititon 收到消息后,并且等待 ISR 列表中的 follower 同步完成,再给 Producer 返回确认,这个配置可以保证数据的可靠性
- 最后,就是 Consumer 必须要能消费到这个消息,实际上,我认为,只要 producer和 broker 的消息可靠的到了保障,那么消费端是不太可能出现消息无法消费的问题,除非是 Consumer 没有消费完这个消息就直接提交了,但是即便是这个情况,也可以通过调整 offset 的值来重新消费。
- Kafka Broker 上存储的消息,都有一个 Offset 标记。然后 kafka 的消费者是通过 offSet 标记来维护当前已经消费的数据,每消费一批数据,Kafka Broker 就会更新 OffSet 的值,避免重复消费。
- 默认情况下,消息消费完以后,会自动提交 Offset 的值,避免重复消费。Kafka 消费端的自动提交逻辑有一个默认的 5 秒间隔,也就是说在 5 秒之后的下一次向Broker 拉取消息的时候提交。所以在 Consumer 消费的过程中,应用程序被强制 kill 掉或者宕机,可能会导致 Offset没提交,从而产生重复提交的问题。除此之外,还有另外一种情况也会出现重复消费。在 Kafka 里面有一个 Partition Balance 机制,就是把多个 Partition 均衡的分配给多个消费者。就会触发 Kafka 的 Rebalance 机制,从而导致 Offset 自动提交失败。
- 基于这样的背景下,我认为解决重复消费消息问题的方法有几个:
- 3.1 提高消费端的处理性能避免触发 Balance,比如可以用异步的方式来处理消息,缩短单个消息消费的市场。或者还可以调整消息处理的超时时间。还可以减少一次性从 Broker 上拉取数据的条数。
- 3.2 可以针对消息生成 md5 然后保存到 mysql 或者 redis 里面,在处理消息之前先去mysql 或者 redis 里面判断是否已经消费过。这个方案其实就是利用幂等性的思想。
- kafka 在 Partition 多副本设计的方案里面,有两个很关键的需求。副本数据的同步和新 Leader 的选举这两个需求都需要涉及到网络通信,Kafka 为了避免网络通信延迟带来的性能问题,以及尽可能的保证新选举出来的 Leader Partition 里面的数据是最新的,所以设计了ISR 这样一个方案。
- ISR 全称是 in-sync replica,它是一个集合列表,里面保存的是和 Leader Parition 节点数据最接近的 Follower Partition
- 如果某个 Follower Partition 里面的数据落后 Leader 太多,就会被剔除 ISR 列表。简单来说,ISR 列表里面的节点,同步的数据一定是最新的,所以后续的 Leader 选举,只需要从 ISR 列表里面筛选就行了。
- 引入 ISR 这个方案的原因有两个:
- 4.1 尽可能的保证数据同步的效率,因为同步效率不高的节点都会被踢出 ISR 列表。
- 4.2 避免数据的丢失,因为 ISR 里面的节点数据是和 Leader 副本最接近的。
- 首先,在 kafka 的架构里面,用到了 Partition 分区机制来实现消息的物理存储,在同一个 topic 下面,可以维护多个 partition 来实现消息的分片。生产者在发送消息的时候,会根据消息的 key 进行取模,来决定把当前消息存储到哪个 partition 里面。消息是按照先后顺序有序存储到 partition 里面的。
- 假设有一个 topic 存在三个 partition,而消息正好被路由到三个独立的 partition 里面。然后消费端有三个消费者通过 balance 机制分别指派了对应消费分区。因为消费者是完全独立的网络节点,所有可能会出现,消息的消费顺序不是按照发送顺序来实现的,从而导致乱序的问题。
- 针对这个问题,一般的解决办法就是自定义消息分区路由的算法,然后把指定的 key都发送到同一个 Partition 里面。接着指定一个消费者专门来消费某个分区的数据,这样就能保证消息的顺序消费了。
- 有些设计方案里面,在消费端会采用异步线程的方式来消费数据来提高消息的处理效率,那这种情况下,因为每个线程的消息处理效率是不同的,所以即便是采用单个分区的存储和消费也可能会出现无序问题,针对这个问题的解决办法就是在消费者这边使用一个阻塞队列,把获取到的消息先保存到阻塞队列里面,然后异步线程从阻塞队列里面去获取消息来消费。
- 消息积压的原因是生产者的消息生产速度大于消费者的消费速度,遇到这个问题的时候,需要排查具体的原因再提出解决方案。
- 如果当前不是因为系统 bug 导致的,那我们可以优化消费端的逻辑,比如通过异步的方式来处理消息、或者通过批量处理的方式来消费。如果通过这两种优化方式还没有缓解,可以考虑对消费端进行水平扩容,从而扩大消费端的消费能力。
- 如果是因为系统 bug 导致大量消息堆积,那么首先需要解决系统 bug,然后临时做紧急扩容来完成大量消息的消费。
- 3.1 首先解决消费端的 bug,来保证消费端的正常消息处理工作。
- 3.2 接着把现在所有的消费端停止,然后新建一个 Topic,然后把 Partition 分区数量调整成原来的 10 倍。
- 3.3 接着写一个用来实现数据分发的 Consumer 程序,这个程序专门去消费现在积压的数据,消费后不做处理,而是直接再把这些数据写入临时建立的 Topic 的 10 个Partition 中。
- 3.4 然后临时增加 10 倍的消费者节点来部署 Consumer,专门来消费临时的 Partition分区数据。
- 通过上面这种方法,可以快速把现在堆积的消息处理完。等积压的消息处理结束后,再把恢复成原来的部署架构,把临时的 Topic 和临时申请的机器释放掉。
- 一个 Topic 逻辑主题,可以分成多个 Partition 分区实现消息内容的物理存储。同时,为了保证 Partition 分区的可靠性,Kafka 设计了分区副本的概念,也就是一个Partition 可以设置多个副本。在多个副本中,由于设计到数据的同步,所以 Kafka 针对 Partition 分区副本集,Leader 副本负责处理所有的读写请求,Follower 负责从 Leader 副本同步数据。
- Kafka 首先会选择一个具有最新数据的副本作为新的 Leader,也就是 ISR 集合中的副本。
- 其中,ISR(In-Sync Replica)是指与 Leader 同步的副本集合,它们的数据同步状态与 Leader 最接近,并且它们与 Leader 副本的网络通信延迟最小。
- 如果 ISR 集合中没有可用的副本,Kafka 会从所有副本中选择一个具有最新数据的副本作为新的 Leader。在这种情况下选举出来的 Leader,由于和原来老的 Leader 节点的数据存在较大的延迟,会造成数据丢失的情况
- 功能开关的选择交通过 unclean.leader.election.enable 参数来设置。开启之后虽然会造成数据丢失,但是至少可以保证依然能对外提供服务,保证了可用性。
kafka leader的选举:
broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader
- 多线程和 MQ 虽然在特性上都支持程序的异步操作,但是在实现本质上区别比较大,
- 处理任务的维度不同,多线程是同一个进程中的多个线程并行处理任务,MQ 是通过把消息发送到不同应用节点的不同进程来
- 数据可靠性不同,多线程异步处理任务时,数据是基于共享内存来交互的,一旦程序崩溃,内存中的数据会丢失;使用 MQ 时,可以通过消息队列的持久化机制来保证消息的可靠性。
- 分布式能力方面,MQ 具备分布式能力,可以把消息分发到不同的节点存储和消费、多线程只能在一个进程中处理任务。