【编码】如何实现一套自定义网络协议?
【编码】如何实现一套自定义网络协议?
前言
下文介绍的自定义协议仅作为学习示例,纯粹是玩具项目,没有实际可用性。无需过度关注和讨论其合理性
进行通信的双方是谁?
常见的模型
客户端-服务器,例如HTTP协议,浏览器<=>Web服务器。
中转站模型,如MQTT协议,应用服务<=>中转站<=>硬件客户端
对等模型,例如Thrift协议,应用服务<=>应用服务。
通用协议如此丰富,还需要自定义协议吗?
需要。许多中间件服务在构建集群时,服务节点之间需要进行高效的内部通信。
在这种场景下,自定义协议能发挥巨大的作用:
- 去除冗余字段:自定义协议能够减少无用字段,最大化优化通信吞吐量
- 灵活性:自定义协议可以根据需求进行灵活扩展,支持注入优先级控制,解压缩控制等特点。
自定义协议可以减少无用字段,最大限度地优化通信吞吐量;也更加灵活,可以进行优先级控制。
例如,Kafka 就使用了自定义协议来满足高效的消息传递需求。
自定义协议设计
所谓网络协议,就是传输的报文格式,以及收发双方处理报文的规则。
报文格式做如下设计:
- 固定头部(4字节)
- 字节1:消息类型
- 1=req,2=res, 3=pub, 4=sub, 5=msg
- 用一个字节来表示类型有点浪费了。
- 字节2~字节4:消息体长度
- 这三个字节能够表示最大值为 16777215,即最大消息体长度为 16MB。
- 字节1:消息类型
- 消息体(可变长度)
规则:
1.服务端收到req包,需返回res包
2.服务端收到sub包,需更新订阅情况
3.服务端收到pub包,需根据订阅情况发送msg包
粘拆包问题
在设计网络协议时,不可不谈粘拆包问题。
什么是粘包和拆包?
这两个都是接收端在接收数据时遇到的问题,其中
- 粘包:多个数据包合并成一个包接收
- 拆包:一个数据包被拆分成多个包接收
为什么会出现粘包与拆包?
根本原因就是传输层的TCP协议,是面向字节流的,它不知道数据边界。
此外,TCP根据网络情况(如最大传输单元MTU)动态调整报文大小,导致数据包的分段与合并。
从而产生粘包和拆包问题
传输流程:
1.发送缓冲区:当应用层产生数据后,这些数据会首先进入Socket连接的发送缓冲区
2.数据拆分:网卡根据缓冲区中的数据内容,将数据拆分成多个小的TCP数据报进行发送
3.接收与重组:接收端的TCP栈会将接收到的多个TCP数据包重新组装成完整的字节流(Socket接收缓冲区)
案例场景
一个常见的场景是,客户端连续发送多个消息(如 100 个字符串),而服务端接收到的数据可能并不完全是 100 条。
要复现这种问题也很简单,只要客户端连续发100个字符串,检查服务端收到的数据条数。
客户端代码:连接建立后,连续发送100次字符串
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); new Thread(() -> { for (int i = 0; i < 100; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer("Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!", CharsetUtil.UTF_8)); } }).start(); }
服务端代码:每收到一个包,就打印一次。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in \= (ByteBuf) msg;
System.out.println("Server Receive:"+in.toString(CharsetUtil.UTF\_8));
ctx.write(in);
}
结果:仅收到两个包,同时存在粘包和拆包问题。一个Siuuuu被截断了
Server Receive:Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuu Server Receive:uuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!Siuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu!
如何处理粘包和拆包?
处理方式由消息格式决定
- 固定长度:每条消息的长度固定,不足部分使用填充
- 特殊分隔符:每条消息的末尾添加特定的分隔符
- 消息头+消息体:消息头长度固定,包含消息体长度信息
由于我们采用的时第三种方式,也是最复杂的一种。
处理的核心在于消息头,因为它携带了消息体的长度信息,是判断消息边界的关键。
粘包的处理
步骤如下:
- 提取消息头:首先提取消息头,从中获取消息体的长度信息
- 读取完整消息:根据消息体的长度,从数据流中读取完整的消息内容
- 重复执行:重复步骤1和步骤2,直到没有更多的数据,或当前数据不足以构成完整的消息
拆包的处理
拆包的处理方式与粘包类似:
- 缓存数据:如果接收到的数据不足一条完整消息,则将数据存入缓冲区。
- 合并新数据:在接收到新数据时,判断缓冲区和新数据是否可以组成完整消息,直到消息完整为止。
- 继续缓存:剩下的数据如果不足,则继续缓存
代码案例
1)客户端
根据上面的协议格式,构建消息。(这里的消息体内容是随机字符串,实际应用中通常是序列化后的POJO对象。)
连接建立后连续发送200条随机长度的消息。
public class EchoClientHandler extends SimpleChannelInboundHandler
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("断开连接");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(() -> {
//连续发送200条消息
for (int i = 0; i < 200; i++) {
try {
ctx.writeAndFlush(Unpooled.copiedBuffer(buildRandomMsg()));
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("Client receive:"+byteBuf.toString(CharsetUtil.UTF\_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//构建消息,其中body内容为随机长度的随机字符串
public static byte\[\] buildRandomMsg() throws IOException {
int length = RandomUtil.randomInt(100, 200);
String body \= RandomUtil.randomString(length);
System.out.println("长度:"+length+"||内容:"+body);
byte type = 1;
byte\[\] lengthBytes = new byte\[3\];
lengthBytes\[0\] = (byte) (length >> 16);
lengthBytes\[1\] = (byte) (length >> 8);
lengthBytes\[2\] = (byte) length;
byte\[\] bodyBytes = body.getBytes(CharsetUtil.UTF\_8);
return concatByteArrays(new byte\[\]{type}, lengthBytes, bodyBytes);
}
//拼接字节数组
public static byte\[\] concatByteArrays(byte\[\]... byteArrays) throws IOException {
// 使用 ByteArrayOutputStream 来拼接字节数组
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
for (byte\[\] array : byteArrays) {
byteArrayOutputStream.write(array);
}
// 返回拼接后的字节数组
return byteArrayOutputStream.toByteArray();
}
}
2)服务端
在看代码前,先说明一下channelRead的调用流程
- Socket接收到TCP报文,将数据写入内核缓冲区
- NIO线程检测到此Socket有可读消息
- NIO线程从内核缓冲区读取消息,得到ByteBuf msg
- NIO线程调用channelRead
得到两个信息
- msg是从缓冲区读取的,它可能包含多条完整消息 + 一条残缺消息。
- msg已经从缓冲区读出,缓冲区数据已清空。对于不完整的消息需要自行缓存
下面代码是直接实现的,主要用来介绍完整的处理逻辑。
实际应用中推荐继承Netty提供的ByteToMessageDecoder,它帮你实现了缓存管理。
public class EchoServerHandler extends ChannelInboundHandlerAdapter { private static final int HEADER_LENGTH = 4; //消息头部长度 private ByteBuf buffer = Unpooled.buffer(1024); //缓存残缺消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf income = (ByteBuf) msg;
//上一次有缓存存在,则本数据包不是消息头开头,
if(buffer.readableBytes() > 0) {
buffer.ensureWritable(income.readableBytes()); //进行必要的扩容
income.readBytes(buffer, income.readableBytes());
readMsgFromBuffer(buffer);
//剩下一点残缺消息
if(buffer.readableBytes() > 0) {
//保留剩下的数据,重置读索引为0
System.out.println("缓存剩余字节:"+buffer.readableBytes());
buffer.discardReadBytes();
} else { //刚刚好,则清空数据
buffer.clear();
}
} else {
readMsgFromBuffer(income);
//剩下的数据全部写入缓存
if (income.readableBytes() >0) {
System.out.println("剩余字节:"+income.readableBytes());
income.readBytes(buffer, income.readableBytes());
}
}
}
//从字节数组中读取完整的消息
private void readMsgFromBuffer(ByteBuf byteBuf) {
//剩余可读消息是否包含一个消息头
while(byteBuf.readableBytes() >= HEADER\_LENGTH) {
byteBuf.markReaderIndex(); //由于可能读不到完整的消息,所以读之前先标记索引位置,方便重置
//读取消息头
byte\[\] headerBytes = new byte\[4\];
byteBuf.readBytes(headerBytes);
//获取类型
int type = headerBytes\[0\] & 0xFF;
//获取消息体长度
int bodyLength = ((headerBytes\[1\] & 0xFF) << 16) |
((headerBytes\[2\] & 0xFF) << 8) |
(headerBytes\[3\] & 0xFF);
//不包含请求体
if (byteBuf.readableBytes() < bodyLength) {
byteBuf.resetReaderIndex(); //重置读索引到当前消息头位置
break;
}
// 完整消息体已经接收,处理消息
byte\[\] body = new byte\[bodyLength\];
byteBuf.readBytes(body);
System.out.println("type:"+type+"||length:"+bodyLength+"||body:"+new String(body, CharsetUtil.UTF\_8));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端输出:服务端逐行打印出消息类型,长度,消息体。
... type:1||length:175||body:0cDDAkum0F9DNwF511AKitTe2zRoSc27IjBYwgoODkXxx78xp0cowcDDNWTZ6xjCZyn6wmI2UxXLYB25TjUnOG9ZyjiZ9Jge3kbxabRjZAo0qsCYFfKMyzxApp953z1N7uDbP9rmlxeyYbYiif3y3ybtnnaAkuKFcspje6SLRnY69Nz
消息体编解码(序列化)
在经过前面粘包和拆包处理后,我们已经能够成功地从数据流中分离并组装出完整的消息。然而,在实际应用中,消息体通常需要进一步转换为对象,才能提交给上层的业务逻辑。
这是传输层的关键职责之一。
常见序列化方法
常见的POJO对象序列化方式包括:
Java序列化(Serializable)
优点:内置,无需额外依赖。
缺点:
- 性能较差,序列化和反序列化速度较慢。
- 无法跨语言使用,限制了不同语言(如Java服务端和C++客户端)之间的数据交换。
JSON
优点:可读性好,方便调试,支持各种语言
缺点:相较于二进制格式,JSON的键(key)通常占用较多空间,大规模数据传输时,带宽开销大。
Protocol Buffers(ProtoBuf)
优势:
- 高效的二进制序列化,体积小,序列化和反序列化速度快。
- 支持跨语言使用,适用于不同编程语言之间的通信。
代码案例
这里我们使用ProtoBuf。
构建消息类
写一个.proto文件,定义消息格式。
hello_request.proto
option java_multiple_files = true; option java_package = "protocol"; option java_outer_classname = "Request";
message HelloRequest { required string requestId = 1; optional string content = 2; }
下载ProtoBuf编译工具包,protoc-{version}-win64.zip
https://github.com/protocolbuffers/protobuf/releases
编译,得到Java文件
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/hello_request.proto
引入对应版本的Jar包。(jar包版本要和protoc版本一致,否则报错)
https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java
接着就可以使用类构建POJO对象和对象的编解码了。
客户端
其他地方不变,使用上面生成好的HelloRequest类,构建对象。通过setter塞入数据,然后通过toByteArray()得到序列化后的二进制数据。
注意:现在的length应该是整个消息体的字节数,不再是随机字符串的长度。
public static byte[] buildRandomMsg() throws IOException { int randomStrLength = RandomUtil.randomInt(100, 200); String msgId = UUID.randomUUID().toString(); String content = RandomUtil.randomString(randomStrLength); HelloRequest request = HelloRequest.newBuilder() .setRequestId(msgId) .setContent(content) .build(); byte[] bodyBytes = request.toByteArray(); int length = bodyBytes.length;
System.out.println("发送消息:"+request.toString());
byte type = 1;
byte\[\] lengthBytes = new byte\[3\];
lengthBytes\[0\] = (byte) (length >> 16);
lengthBytes\[1\] = (byte) (length >> 8);
lengthBytes\[2\] = (byte) length;
return concatByteArrays(new byte\[\]{type}, lengthBytes, bodyBytes);
}
服务端
其他地方不变,解析body的时候,使用HelloRequest.parseFrom(byte[] bytes)进行解码,得到HellpRequest对象。
//System.out.println("type:"+type+"||length:"+bodyLength+"||body:"+new String(body, CharsetUtil.UTF_8)); if(type == 1) { try { HelloRequest request = HelloRequest.parseFrom(body); System.out.println("收到消息:"+request.toString()); } catch (Exception e) { System.out.println("解析失败:"+new String(body, CharsetUtil.UTF_8)); } } else { System.out.println("消息类型未知:"+type); }
结果
客户端输出
... 发送消息:requestId: "ca9b3e07-0662-467c-9bed-843b519c2480" content: "q82EuHvGgMhwbHl1t0qfv4M2NCJLikxahpEc8q9ezpCWUbU9M1Oh6U6zfIOnBC50ex5BweYfZ2JB0NoLmP4hgIsNzZ8mtfFPayi8KlDWRQw3gj7ENRgxjbm4HxJgrdDNobuguc8EPQ3SccWXGTsZytLEeOHJXskiGlH4oEf"
服务端输出
....
收到消息:requestId: "ca9b3e07-0662-467c-9bed-843b519c2480" content: "q82EuHvGgMhwbHl1t0qfv4M2NCJLikxahpEc8q9ezpCWUbU9M1Oh6U6zfIOnBC50ex5BweYfZ2JB0NoLmP4hgIsNzZ8mtfFPayi8KlDWRQw3gj7ENRgxjbm4HxJgrdDNobuguc8EPQ3SccWXGTsZytLEeOHJXskiGlH4oEf"
实现异步请求
结构设计
底层Socket是天然支持异步的,因为发送和接收是可以同时进行的,不会互相影响。
要实现异步请求的效果,上层API只要做到以下几点:
- 请求发送后,不会阻塞当前执行线程
- 响应到达后可以触发回调
- 超时(指定时间内没有收到响应)也可以触发回调
实现方式
- 请求接口发送请求后返回Future对象,可选择同步等待
- 客户端保留请求和对应的callback
- 服务端响应的时候返回请求ID
- 客户端根据ID获取关联请求,执行callback。
首先,项目结构图如下:
1.划线部分是废弃类
2.【变更】解码方式修改,新增通用的MessageDecoder可供双方解码,其继承于ByteToMessageDecoder。
3.【新增】新增HelloResponse
4.【新增】新增通用MessageEncoder,继承于MessageToByteEncoder
代码实现
1. MessageDecoder.java
相比前面直接实现的,这里不用去管理缓存。另外,这里解析好的消息会写入List,但它其实是逐个传给下一个Handler。
public class MessageDecoder extends ByteToMessageDecoder { private static final int HEADER_LENGTH = 4; //消息头部长度 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List