1、netty如何解析多协议
前提:
项目地址:https://gitee.com/q529075990qqcom/NB-IOT.git
我们需要一个创建mavne项目,这个项目是我已经写好的项目,项目结构图如下:
创建公共模块
创建子模块,准备好依赖Netty4.1版本
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.72.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.28</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.28</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>5.3.0</version> </dependency> </dependencies>
maven依赖
序列化的定义是:将一个对象编码成一个字节流(I/O);而与之相反的操作被称为反序列化。
package serializer; /** * @description: * @author: quliang * @create: 2022-10-20 15:16 **/ public interface Serializer { /** * 序列化 * * @param obj * @return * @throws Exception */ byte[] serialize(Object obj) throws Exception; /** * 反序列化 * * @param bytes * @param clazz * @param <T> * @return * @throws Exception */ <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception; }
自定义序列化接口
package serializer; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @description: * @author: quliang * @create: 2022-10-20 15:18 **/ public class KryoSerializer implements Serializer { private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.setReferences(true); kryo.setRegistrationRequired(false); ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; }); @Override public byte[] serialize(Object obj) throws Exception { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Output output = new Output(baos); Kryo kryo = kryoThreadLocal.get(); kryo.writeObject(output, obj); kryoThreadLocal.remove(); return output.toBytes(); } catch (IOException e) { throw new Exception("序列化失败", e); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception { try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { Input input = new Input(bais); Kryo kryo = kryoThreadLocal.get(); Object obj = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return clazz.cast(obj); } catch (IOException e) { throw new Exception("反序化失败"); } } }
Kryo实现序列化接口
我们需要解析两种协议,那我们就要提前定义好两种协议,分别是消息协议、登录协议
消息协议相关
package protocol.msg; import lombok.Data; import lombok.Getter; /** * @description: 消息协议: |magic|version|data| * @author: quliang * @create: 2022-12-10 20:46 **/ @Data public class MsgProtocol { @Getter private byte magic=0; @Getter private byte version=1; }
消息协议基类
package protocol.msg.request; import lombok.Data; import protocol.msg.MsgProtocol; /** * @description: * @author: quliang * @create: 2022-12-10 20:58 **/ @Data public class MsgRequest extends MsgProtocol { private String msg; }
消息请求子类
package protocol.msg.response; import lombok.Data; import protocol.msg.MsgProtocol; /** * @description: * @author: quliang * @create: 2022-12-10 20:41 **/ @Data public class MsgResponse extends MsgProtocol { private int statCode; }
消息响应子类
package encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import protocol.msg.MsgProtocol; import serializer.KryoSerializer; /** * @description: * @author: quliang * @create: 2022-12-10 20:53 **/ public class MsgEncoder extends MessageToByteEncoder<MsgProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MsgProtocol msgProtocol, ByteBuf in) throws Exception { in.writeByte(msgProtocol.getMagic()); // in.writeByte(msgProtocol.code()); in.writeByte(msgProtocol.getVersion()); byte[] data = new KryoSerializer().serialize(msgProtocol); in.writeShort(data.length); in.writeBytes(data); } }
消息协议编码
package decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import protocol.msg.MsgProtocol; import serializer.KryoSerializer; import java.util.List; /** * @description: * @author: quliang * @create: 2022-12-10 20:52 **/ @Slf4j public class MsgDecoder extends ByteToMessageDecoder { private Class<MsgProtocol> msgClass; public MsgDecoder(Class clazz) { this.msgClass = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { byte magic = in.readByte(); byte version = in.readByte(); short dataSize = in.readShort(); byte[] data = new byte[dataSize]; in.readBytes(data); MsgProtocol baseProtocol = new KryoSerializer().deserialize(data, msgClass); out.add(baseProtocol); } catch (Exception e) { //如果解码错误,将数据传递到下一个解码器中 log.error("msg decoder {}",e.getMessage()); // 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空 in.resetReaderIndex(); // 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空; ByteBuf buff = in.retainedDuplicate(); //原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。 in.skipBytes(in.readableBytes()); //继续传递到下一个解码器中 out.add(buff); } } }
消息协议解码
登录协议相关
package protocol.system; import lombok.Getter; /** * @description: 登录协议: |magic|version|code|data| * @author: quliang * @create: 2022-12-09 18:10 **/ public class LoginProtocol { @Getter private byte magic=0; @Getter private byte version=1; @Getter public byte code; }
登录协议基类
package protocol.system.request; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import protocol.system.LoginProtocol; /** * @description: * @author: quliang * @create: 2022-12-06 18:17 **/ @Data @NoArgsConstructor @AllArgsConstructor public class LoginRequest extends LoginProtocol { private String userId; private String userName; }
登录请求子类
package protocol.system.response; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import protocol.system.LoginProtocol; /** * @description: * @author: quliang * @create: 2022-12-06 18:22 **/ @Data @NoArgsConstructor @AllArgsConstructor public class LoginResponse extends LoginProtocol { private String msg; private String data; }
登录响应子类
package encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import protocol.system.LoginProtocol; import serializer.KryoSerializer; /** * @description: * @author: quliang * @create: 2022-12-06 22:11 **/ public class LoginEncoder extends MessageToByteEncoder<LoginProtocol> { @Override protected void encode(ChannelHandlerContext ctx, LoginProtocol baseProtocol, ByteBuf in) throws Exception { in.writeByte(baseProtocol.getMagic()); in.writeByte(baseProtocol.getCode()); in.writeByte(baseProtocol.getVersion()); byte[] data = new KryoSerializer().serialize(baseProtocol); in.writeShort(data.length); in.writeBytes(data); } }
登录协议编码
package decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import protocol.system.LoginProtocol; import serializer.KryoSerializer; import java.util.List; /** * @description: * @author: quliang * @create: 2022-12-06 17:59 **/ @Slf4j public class LoginDecoder extends ByteToMessageDecoder { private Class<LoginProtocol> clazz; public LoginDecoder(Class clazz) { this.clazz = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { byte magic = in.readByte(); byte code = in.readByte(); byte version = in.readByte(); short dataSize = in.readShort(); byte[] data = new byte[dataSize]; in.readBytes(data); LoginProtocol baseProtocol = new KryoSerializer().deserialize(data, clazz); out.add(baseProtocol); } catch (Exception e) { //如果解码错误,将数据传递到下一个解码器中 log.error("login decoder {}", e.getMessage()); // 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空 in.resetReaderIndex(); // 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空; ByteBuf buff = in.retainedDuplicate(); //原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。 in.skipBytes(in.readableBytes()); //继续传递到下一个解码器中 out.add(buff); } } }
登录协议解码
这样公共模块就创建完成了
创建服务端
package com.ql; import com.ql.handler.MsgHandler; import decoder.LoginDecoder; import decoder.MsgDecoder; import com.ql.handler.LoginHandler; import encoder.LoginEncoder; import encoder.MsgEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.msg.request.MsgRequest; /** * @author quliang * @description 服务端 * @date 2022-12-06 17:39:14 */ @Slf4j public class IotServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group( bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); /** * 心跳机制 */ //pipeline.addLast(new IdleStateHandler(5, 10, 5, TimeUnit.SECONDS)); /** * 消息、登录解码器 */ pipeline.addLast(new LoginDecoder(LoginRequest.class)); pipeline.addLast(new MsgDecoder(MsgRequest.class)); /** * 消息、登录处理器 */ pipeline.addLast(new MsgHandler()); pipeline.addLast(new LoginHandler()); /** * 消息、登录编码器 */ pipeline.addLast(new MsgEncoder()); pipeline.addLast(new LoginEncoder()); } }) .option(ChannelOption.SO_BACKLOG, 1024); ChannelFuture cf = bootstrap.bind(8849).sync(); log.info("socket服务端启动成功 {}", cf.channel().localAddress().toString()); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
服务端代码
package com.ql.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.request.MsgRequest; import protocol.msg.response.MsgResponse; /** * @description: 消息处理器 * @author: quliang * @create: 2022-12-10 20:57 **/ @Slf4j @ChannelHandler.Sharable public class MsgHandler extends SimpleChannelInboundHandler<MsgRequest> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("上线{}", ctx.channel().remoteAddress().toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, MsgRequest request) throws Exception { log.info("服务端读取消息体数据为{}", request.toString()); MsgResponse response = new MsgResponse(); response.setStatCode(200); ctx.channel().writeAndFlush(response); } }
服务端消息处理器
package com.ql.handler; import io.netty.channel.*; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.system.response.LoginResponse; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 登录处理器 * @author: quliang * @create: 2022-12-06 18:14 **/ @Slf4j @ChannelHandler.Sharable public class LoginHandler extends SimpleChannelInboundHandler<LoginRequest>{ private static AtomicInteger READER_COUNT = new AtomicInteger(0); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("服务端:{} 通道开启!", ctx.channel().localAddress().toString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("服务端: {} 通道关闭!", ctx.channel().localAddress().toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequest loginRequest) throws Exception { log.info("读取数据 {} ", loginRequest.toString()); LoginResponse response= new LoginResponse("success", null); ctx.channel().writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("...............数据接收-完毕..............."); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.error("...............业务处理异常...............{}", cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; Channel channel = ctx.channel(); switch (event.state()) { case READER_IDLE: log.info("读空闲"); READER_COUNT.addAndGet(1); break; case WRITER_IDLE: log.info("写空闲"); break; default: break; } ctx.disconnect(); if (READER_COUNT.get() > 3) { log.info("close this channel {}", channel.remoteAddress().toString()); } } } }
服务端登录处理器
服务端其实很多都是直接引用公共模块的,代码也并不复杂
创建消息客户端
package com.ql; import com.ql.handler.ClientMsgHandler; import decoder.MsgDecoder; import encoder.MsgEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.response.MsgResponse; import java.net.InetSocketAddress; /** * @author quliang * @description 客户端 * @date 2022-12-06 17:37:56 */ @Slf4j public class IotClientMsg { public static void main(String[] args) throws InterruptedException { EventLoopGroup clientGroup = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(clientGroup) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("169.254.190.154", 8849)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); //消息解码器 pipeline.addLast(new MsgDecoder(MsgResponse.class)); //客户端消息处理器 pipeline.addLast(new ClientMsgHandler()); //消息编码器 pipeline.addLast(new MsgEncoder()); } }); ChannelFuture cf = bs.connect().sync(); log.info("启动成功{}", cf.channel().localAddress().toString()); // Scanner scanner = new Scanner(System.in); cf.channel().closeFuture().sync(); } finally { clientGroup.shutdownGracefully().sync(); } } }
客户端代码
package com.ql.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.request.MsgRequest; import protocol.msg.response.MsgResponse; /** * @description: * @author: quliang * @create: 2022-12-10 20:40 **/ @Slf4j @ChannelHandler.Sharable public class ClientMsgHandler extends SimpleChannelInboundHandler<MsgResponse> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { MsgRequest request = new MsgRequest(); request.setMsg("hello"); ctx.channel().writeAndFlush(request); } @Override protected void channelRead0(ChannelHandlerContext ctx, MsgResponse response) throws Exception { int code = response.getStatCode(); log.info("消息处理器读取响应对象数据为{}", code); } }
客户端消息处理器
消息客户端代码也并不复杂
创建登录客户端
package com.ql; import com.ql.handler.ClientLoginHandler; import decoder.LoginDecoder; import encoder.LoginEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.system.response.LoginResponse; import java.net.InetSocketAddress; /** * @author quliang * @description 客户端 * @date 2022-12-06 17:37:56 */ @Slf4j public class IotClientLogin { public static void main(String[] args) throws InterruptedException { EventLoopGroup clientGroup = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(clientGroup) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("169.254.190.154", 8849)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new LoginDecoder(LoginResponse.class)); //pipeline.addLast(new MsgDecoder(MsgResponse.class)); //pipeline.addLast(new ClientMsgHandler()); pipeline.addLast(new ClientLoginHandler()); //pipeline.addLast(new MsgEncoder()); pipeline.addLast(new LoginEncoder()); } }); ChannelFuture cf = bs.connect().sync(); log.info("启动成功{}", cf.channel().localAddress().toString()); // Scanner scanner = new Scanner(System.in); cf.channel().closeFuture().sync(); } finally { clientGroup.shutdownGracefully().sync(); } } }
客户端代码
package com.ql.handler; import io.netty.channel.*; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.system.response.LoginResponse; import java.util.Scanner; /** * @description: * @author: quliang * @create: 2022-12-06 22:16 **/ @Slf4j @ChannelHandler.Sharable public class ClientLoginHandler extends SimpleChannelInboundHandler<LoginResponse> { private Scanner scanner = new Scanner(System.in); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客户端:{} 通道开启!", ctx.channel().localAddress().toString()); login(ctx); } /** * 登录方法 * @param ctx */ private void login(ChannelHandlerContext ctx) { LoginRequest request = new LoginRequest("123", "123"); ctx.channel().writeAndFlush(request); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), msg.toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponse response) throws Exception { log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), response.toString()); String msg = response.getMsg(); log.info("========{}", msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("...............数据接收-完毕..............."); } }
客户端登录处理器
我们是怎么通过这个项目来实现不同协议编解码?
其实也不难,我们仔细看MsgDecoder、LoginDecoder两个类其中一个类的代码,其中有个巧妙的操作就是使用try-catch,
只要解码器无法解码发生异常,就重置读取字节索引传递到下一个解码器中,直到传递到正确解码器中。不过为了兼容多种协议,
解码异常也会让服务端性能有所下降的,取舍之间必有得失。