图解 Kafka 网络层实现机制之 Selector 多路复用器

网站建设3年前发布
37 0 0

20230306104330622dc020679021eca74199345b7b9cab501a57485,大家好,我是 华仔, 又跟大家见面了。,在​​上一篇​​中,主要带大家深度剖析了「Kafka 对 NIO SocketChannel、Buffer」的封装全过程,今天我们接着聊聊 Kafka 是如何封装 Selector 多路复用器的,本系列总共分为3篇,今天是中篇,主要剖析4、5两个问题:,认真读完这篇文章,我相信你会对 Kafka 封装 Java NIO 源码有更加深刻的理解。,这篇文章干货很多,希望你可以耐心读完。,20230306104119413be8969a48a24893250198416cdeb74177db517,大家都知道在 Java NIO 有个三剑客,即「SocketChannel通道」、「Buffer读写」、「Selector多路复用器」,上篇已经讲解了前2个角色,今天我们来聊聊最后一个重要的角色。,Kafka Selector 是对 Java NIO Selector 的二次封装,主要功能如下:,为了方便大家理解,所有的源码只保留骨干。,github 源码地址如下:,org.apache.kafka.common.network.Selector,该类是 Kafka 网络层最重要最核心的实现,也是非常经典的工业级通信框架实现,为了简化,这里称为 Kselector, 接下来我们先来看看该类的重要属性字段:,重要字段如下所示:,介绍完字段后,我们来看看该类的方法。方法比较多,这里深度剖析下其中几个重要方法,通过学习这些方法的不仅可以复习下 Java NIO 底层组件,另外还可以学到 Kafka 封装这些底层组件的实现思想。,NetworkClient 的请求一般都是交给 Kselector 去处理并完成的。而 Kselector 使用 NIO 异步非阻塞模式负责具体的连接、读写事件等操作。,我们先看下连接过程,客户端在和节点连接的时候,会创建和服务端的 SocketChannel 连接通道。Kselector 维护了每个目标节点对应的 KafkaChannel。,如下图所示:,20230306104331e9e38e73389a698da605099ce511980ab1afcd508,该方法主要是用来发起网络连接,连接过程大致分为如下六步:,这里需要注意下: 因为是非阻塞方式,所以 channel.connect() 发起连接,「可能在正式建立连接前就返回了」,为了确定连接是否建立,需要再调用 「finishConnect」 确认完全连接上了。,该方法主要用来注册和绑定连接的,过程如下:,讲解完建立连接后,我们来看看消息发送的相关方法。,KSelector.send() 方法是将之前创建的 RequestSend 对象先缓存到 KafkaChannel 的 send 字段中,并关注此连接的 OP_WRITE 事件,并没有真正发生网络 I/O 操作。会在下次调用 KSelector.poll() 时,才会将 RequestSend 对象发送出去。,如果此 KafkaChannel 的 send 字段上还保存着一个未完全发送成功的 RequestSend 请求,为了防止覆盖,会抛出异常。每个 KafkaChannel 一次 poll 过程中只能发送一个 Send 请求。,客户端的请求 Send 会被设置到 KafkaChannel 中,KafkaChannel 的 TransportLayer 会为 SelectionKey 注册 OP_WRITE 事件。,此时 Channel 的 SelectionKey 就有了 OP_CONNECT、OP_WRITE 事件,在 Kselector 的轮询过程中当发现这些事件准备就绪后,就开始执行真正的操作。,基本流程就是:,2023030610412037c4b3311006addb5a595169e1f19a46c0a9bf487,该方法主要用来消息预发送,即在发送的时候把消息线暂存在 KafkaChannel 的 send 字段里,然后等着 poll() 执行真正的发送,过程如下:,讲完消息预发送,接下来我们来看看最核心的 poll 和 pollSelectionKeys 方法。,在 Kselector 的轮询中可以操作连接事件、读写事件等,是真正执行网络I/O事件操作的地方,它会调用 nioSelector.select() 方法等待 I/O 事件就绪。,当 Channel 可写时,发送 KafkaChannel.send 字段,「一次最多只发送一个 RequestSend,有时候一个 RequestSend 也发送不完,需要多次 poll 才能发送完成」。,当 Channel 可读时,读取数据到 KafkaChannel.receive,「当读取一个完整的 NetworkReceive ,并在一次 pollSelectionKeys() 完成后会将 NetworkReceive 中的数据转移到 completedReceives 集合中」。,最后调用 maybeCloseOldestConnection() 方法,根据 lruConnections 记录,设置 channel 状态为过期,并关闭长期空闲的连接。,该方法主要用来实现网络操作的,即收集准备就绪事件,并针对事件进行网络操作,具体的过程如下:,该方法是用来处理监听到的事件,包括连接事件、读写事件、以及立即完成的连接的。具体过程如下:,讲解完最核心的 poll() 和 pollSelectionKeys() 方法后,我们来看看「网络读写事件」的处理过程。,该方法用来判断尝试进行写操作,方法很简单,必须「同时满足4个条件」:,当满足以上4个条件后就可以进行写操作了,接下来我们看看写操作的过程。,该方法用来真正执行写操作,数据就是上面send()方法被填充的send字段。具体过程如下:,接下来我们来看看读操作过程。,该方法主要用来尝试读取数据并添加已经接收完毕的集合中。,接下来我们看看几个其他比较简单的方法。,该方法主要用来返回发送完成的Send集合数据。,该方法主要用来返回已经接收完毕的请求集合数据。,该方法主要用来返回断开连接的 broker 集合数据。,该方法主要用来返回连接成功的 broker 集合数据。,该方法主要用来判断对应的 Channel 是否准备好,参数是 channel id。,该方法主要用来将某个 channel 添加到已经接收完毕的网络请求集合中。,为什么会有这个管理器,大家都知道对于 TCP 大量连接或者重连是会对 Kafka 造成性能影响的,而 Kafka 客户端又不能同时连接过多的节点。因此设计这样一个 LRU 算法,每隔9分钟就删除一个空闲过期的连接,以保证已有连接的有效。,该类通过「LinkedHashMap 结构来实现一个 lru 连接集合」,最核心的方法就是 update() 来更新链接的活跃时间,remove() 来删除连接。,主要用在以下3个地方:,网络连接总共分为以下两个阶段:,2023030610412088a39200805cf5a17fd0767c78725e50b5bee2951,这里,我们一起来总结一下这篇文章的重点。,1、带你先整体的梳理了 Kafka 对 Java NIO 三剑客中的 Selector 的功能介绍。,2、又带你剖析了 Selector 的重要方法和具体的操作过程。,3、介绍空闲超时到期连接管理器是什么,有什么作用?,4、最后带你梳理了网络连接的全流程。

© 版权声明

相关文章