图解 Kafka 网络层实现机制(一)

网站建设3年前发布
61 0 0

今天我们就来聊聊 Kafka 是如何对 Java NIO 进行封装的,本系列总共分为3篇,主要剖析以下几个问题:,本篇只讨论前3个问题,剩余的放到后2篇中。,认真读完这篇文章,我相信你会对 Kafka 封装 Java NIO 源码有更加深刻的理解。,这篇文章干货很多,希望你可以耐心读完。,​​上篇​​剖析了「生产者元数据的拉取和管理的全过程」,此时发送消息的时候就有了元数据,但是还没有进行网络通信,而网络通信是一个相对复杂的过程,对于 Java 系统来说网络通信一般会采用 NIO 库来实现,所以 Kafka 对 Java NIO 封装了统一的框架,来实现多路复用的网络 I/O 操作。,为了方便大家理解,所有的源码只保留骨干。,如果大家对 Java NIO 不了解的话,可以看下这个文档,这里就不过多介绍了。,我们来看看 Kafka 对 Java NIO 组件做了哪些封装? 这里先说下结果,后面会深度剖析。,接下来我们挨个对上面组件进行剖析。,TransportLayer 接口是对 NIO 中 「SocketChannel」 的封装。它的实现类总共有 2 个:,本篇只剖析 PlaintextTransportLayer 的实现。,github 源码地址如下:,从上面代码可以看出,该类就是对底层 NIO 的 socketChannel 封装引用。将构造函数的 SelectionKey 类对象赋值给 key,然后从 key 中取出对应的 SocketChannel 赋值给 socketChannel,这样就完成了初始化工作。,接下来,我们看看几个重要方法是如何使用这2个 NIO 组件的。,该方法主要用来判断网络连接是否完成,如果完成就关注 「OP_READ」 事件,并取消 「OP_CONNECT」 事件。,这里通过「二进制位运算」巧妙的解决了网络事件的监听操作,实现非常经典。,通过 socketChannel 在 Selector 多路复用器注册事件返回 SelectionKey ,SelectionKey 的类型包括:,首先"~"符号代表按位取反,"&"代表按位取与,通过 key.interestOps() 获取当前的事件,然后和 OP_CONNECT事件取反「11110111」 后按位与操作。,所以,"& ~xx" 代表删除 xx 事件,有就删除,没有就不变;而 "| xx" 代表将 xx 事件添加进去。,该方法主要用来把 socketChannel 里面的数据读取缓冲区 ByteBuffer 里,通过调用 socketChannel.read() 实现。,该方法主要用来把缓冲区 ByteBuffer 的数据写到 SocketChannel 里,通过调用 socketChannel.write() 实现。,大家都知道在网络编程中,一次读写操作并一定能把数据读写完,所以就需要判断是否读写完成,势必会涉及数据的「拆包」、「粘包」操作。 这些操作比较繁琐,因此 Kafka 将 ByteBuffer 的读写操作进行重新封装,分别对应 NetworkReceive 读操作、NetworkSend 写操作,对于上层调用无需判断是否读写完成,更加友好。,接下来我们就来分别剖析下这2个类的实现。,从属性可以看出,包含2个 ByteBuffer,分别是 size 和 buffer。这里重点说下源码中的size字段的初始化。通过长度编码方式实现,上来就先分配了4字节大小的 ByteBuffer 来存储响应消息数据长度,即32位,与 Java int 占用相同的字节数,完全满足表示消息长度的值。,介绍完字段后,我们来深度剖析下该类的几个重要的方法。,该方法主要用来把对应 channel 中的数据读到 ByteBuffer 中,包括响应消息数据长度的 size 和响应消息数据体长度的 buffer,可能会被多次调用,每次都需要判断 size 和 buffer 的状态并读取。,在读取时,先读取4字节到 size 中,再根据 size 的大小为 buffer 分配内存,然后读满整个 buffer 时就表示读取完成了。,通过短短的30行左右代码就解决了工业级「拆包」 、「粘包」问题,相当的经典。,如果要解决「粘包」问题,就是在每个响应数据中间插入一个特殊的字节大小的「分隔符」,这里就在响应消息体前面插入4个字节,代表响应消息自己本身的数据大小,如下图所示:,具体「拆包」的操作步骤如下:,该方法主要用来判断是否都读取完成,即响应头大小和响应体大小都读取完。,该方法主要用来返回响应头和响应体还有多少数据需要读出。,此时已经剖析完读 Buffer 的封装,接下来我们看看写 Buffer。,github 源码地址如下:,调用关系图如下:,我们先看一下接口 Send 都定义了哪些方法。,Send 作为要发送数据的接口, 子类 ByteBufferSend 实现 complete() 方法用于判断是否已经发送完成,实现 writeTo() 方法来实现写入数据到Channel中。,ByteBufferSend 类实现了 Send 接口,即实现了数据从 ByteBuffer 数组发送到 channel:,我们来看下这个类中的几个重要字段:,介绍完字段后,我们来深度剖析下该类的几个重要的方法。,该方法主要用来把 buffers 数组写入到 SocketChannel里,因为在网络编程中,写一次不一定可以完全把数据都写成功,所以调用底层 channel.write(buffers) 方法会返回「已经写入成功多少字节」的返回值,这样调用一次后就知道已经写入多少字节了。,NetworkSend 类继承了 ByteBufferSend 类,真正用来写 Buffer。,该类相对简单些,就是构建一个发往 channel 对应的节点 id 的消息数据,它的实例化过程如下:,另外 ByteBuffer[] 为两个 buffer,可以理解为一个消息头 buffer 即 size,一个消息体 buffer。消息头 buffer 的长度为4byte,存放的是消息体 buffer 的长度。而消息体 buffer 是上层传入的业务数据,所以 send 就是持有一个待发送的 ByteBuffer。,接下来我们来看看 KafkaChannel 是如何对上面几个类进行封装的。,github 源码地址如下:,我们来看下这个类中的几个重要字段:,从属性可以看出,有3个最重要的成员变量:TransportLayer、NetworkReceive、Send。KafkaChannel 通过 TransportLayer 进行读写操作,NetworkReceive 用来读取,Send 用来写出。,为了封装普通和加密的Channel「TransportLayer根据网络协议的不同,提供不同的子类」而对于 KafkaChannel 提供统一的接口,「这是策略模式很好的应用」。,介绍完字段后,我们来深度剖析下其网络读写操作是如何实现的?,该方法主要用来预发送,即在发送网络请求前,将需要发送的ByteBuffer 数据保存到 KafkaChannel 的 send 中,然后调用传输层方法增加对这个 channel 上「OP_WRITE」事件的关注。当真正执行发送的时候,会从 send 中读取数据。,该方法主要用来把保存在 send 上的数据真正发送出去。,该方法主要用来把从网络I/O操作中读出的数据保存到 NetworkReceive 中。,该方法主要用来判断数据已经读取完毕了,而判断是否读完的条件是 NetworkReceive 里的 buffer 是否用完,包括上面说过的表示响应消息头 size ByteBuffer 和响应消息体本身的 buffer ByteBuffer。这两个都读完才算真正读完了。,该方法主要用来是否写数据完毕了,而判断的写数据完毕的条件是buffer 中没有剩余且pending为false。,最后我们来聊聊事件注册和取消的具体时机,以便更好的理解网络 I/O 操作。,我们知道 Java NIO 是基于 epoll 模型来实现的。所有基于 epoll 的框架,都有3个阶段:,这里我们来看下相关事件是何时被注册和取消的。,在 Selector 发起网络连接的时候进行「OP_CONNECT」事件注册。,在 PlainTransportLayer 明文传输层完成连接的时候取消 「OP_CONNECT」事件。,从上面也可以看出,「OP_READ」事件的注册和「OP_CONNECT」事件的取消是同时进行的。,由于 「OP_READ」事件是要一直监听是否有新数据到来,所以不会取消。并且因为是 Java NIO 使用的 「epoll 的 LT 模式」,只要「读缓冲区」有数据,就会一直触发。,在 KafkaChannel 真正发送网络请求之前注册「OP_WRITE」事件。,这里,我们一起来总结一下这篇文章的重点。,1、带你先整体的梳理了 Kafka 对 Java NIO 封装的组件以及调用关系图。,2、分别带你梳理了传输层 TransportLayer 的明文网络传输层的实现、网络读操作 NetworkReceive、网络写操作 NetworkSend 的实现、以及 KafkaChannel 是如何进一步对上面组件进行封装提供更加友好的网络连接、读写操作的。,3、最后剖析了网络 I/O 操作过程中的事件注册和取消时机。

© 版权声明

相关文章