在我们使用 netty 的过程中,有时候为了高效的传输数据,经常使用 protobuf 进行数据的传输,netty默认情况下为我们实现的 protobuf 的编解码,但是默认的只能实现单个对象的编解码,但是我们在使用 netty 的过程中,可能需要传输的对象有各种各样的,那么该如何实现对protobuf多协议的解码呢?
在 protobuf 中有一种类型的字段叫做 oneof , 被 oneof 声明的字段就类似于可选字段,在同一时刻只有一个字段有值,并且它们会共享内存。
有了上述基础知识,我们来实现一个简单的功能。
需求:
客户端在连接上服务器端后,每隔 1s 向服务器端发送一个 protobuf 类型的对象(比如登录报文、创建任务报文、删除任务报文等等),服务器端接收到这个对象并打印出来。
protobuf文件的编写:
在protobuf 文件中,我们申明一个 枚举类型的字段,用来标识当前发送的 protobuf 对象的类型,比如是登录报文、创建任务报文还是别的,然后在 oneof 字段中,申明所有可能需要传递的 报文实体。
一、protobuf-java jar包的引入
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
二、proto 文件的编写
注意:
1、定义的枚举是为了标识当前发送的是什么类型的消息
2、需要发送的多个消息统一放入到 oneof 中进行申明
3、到时候给 netty 编解码的时候就编解码 TaskProtocol 对象
三、使用 protoc 命令根据 .proto 文件生成 对应的 java 代码
四、netty服务器端的编写
/** * netty protobuf server * * @author huan.fu * @date 2019/2/15 - 11:54 */ @Slf4j public class NettyProtobufServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(1); EventLoopGroup childGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 连接超时 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000) .handler(new LoggingHandler(LogLevel.TRACE)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new ServerProtobufHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture future = bootstrap.bind(9090).sync(); log.info("server start in port:[{}]", 9090); // 等待服务端链路关闭后,main线程退出 future.channel().closeFuture().sync(); // 关闭线程池资源 parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } }
注意:
1、注意一下 netty 是如何使用那些编解码器来编解码 protobuf 的。
五、服务器端接收到客户端发送过来的消息的处理
/** * 服务器端接收到客户端发送的请求,然后随机给客户端返回一个对象 * * @author huan.fu * @date 2019/2/15 - 14:26 */ @Slf4j public class ServerProtobufHandler extends SimpleChannelInboundHandler<TaskProtobufWrapper.TaskProtocol> { @Override protected void channelRead0(ChannelHandlerContext ctx, TaskProtobufWrapper.TaskProtocol taskProtocol) { switch (taskProtocol.getPackType()) { case LOGIN: log.info("接收到一个登录类型的pack:[{}]", taskProtocol.getLoginPack().getUsername() + " : " + taskProtocol.getLoginPack().getPassword()); break; case CREATE_TASK: log.info("接收到一个创建任务类型的pack:[{}]", taskProtocol.getCreateTaskPack().getTaskId() + " : " + taskProtocol.getCreateTaskPack().getTaskName()); break; case DELETE_TASK: log.info("接收到一个删除任务类型的pack:[{}]", Arrays.toString(taskProtocol.getDeleteTaskPack().getTaskIdList().toArray())); break; default: log.error("接收到一个未知类型的pack:[{}]", taskProtocol.getPackType()); break; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.error("发生异常", cause); } }
注意:
1、服务器端根据 packType 字段来判断客户端发送的是什么类型的消息
六、netty 客户端的编写
/** * netty protobuf client * * @author huan.fu * @date 2019/2/15 - 11:54 */ @Slf4j public class NettyProtobufClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new ClientProtobufHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 9090).sync(); log.info("client connect server."); future.channel().closeFuture().sync(); group.shutdownGracefully(); } }
七、客户端连接到服务器端时的处理
/** * 客户端连接到服务器端后,每隔1s发送一个报文到服务器端 * * @author huan.fu * @date 2019/2/15 - 14:26 */ @Slf4j public class ClientProtobufHandler extends ChannelInboundHandlerAdapter { private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private AtomicInteger atomicInteger = new AtomicInteger(1); @Override public void channelActive(ChannelHandlerContext ctx) { executor.scheduleAtFixedRate(() -> { // 产生的pack类型 int packType = new Random().nextInt(3); switch (TaskProtobufWrapper.PackType.forNumber(packType)) { case LOGIN: TaskProtobufWrapper.LoginPack loginPack = TaskProtobufWrapper.LoginPack.newBuilder().setUsername("张三[" + atomicInteger.getAndIncrement() + "]").setPassword("123456").build(); ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.LOGIN).setLoginPack(loginPack).build()); break; case CREATE_TASK: TaskProtobufWrapper.CreateTaskPack createTaskPack = TaskProtobufWrapper.CreateTaskPack.newBuilder().setCreateTime(System.currentTimeMillis()).setTaskId("100" + atomicInteger.get()).setTaskName("任务编号" + atomicInteger.get()).build(); ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.CREATE_TASK).setCreateTaskPack(createTaskPack).build()); break; case DELETE_TASK: TaskProtobufWrapper.DeleteTaskPack deleteTaskPack = TaskProtobufWrapper.DeleteTaskPack.newBuilder().addTaskId("1001").addTaskId("1002").build(); ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.DELETE_TASK).setDeleteTaskPack(deleteTaskPack).build()); break; default: log.error("产生一个未知的包类型:[{}]", packType); break; } }, 0, 1, TimeUnit.SECONDS); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.error("发生异常", cause); } }
注意:
1、客户端在连接服务器端时每隔1s发送不同的消息到服务器端
八、运行结果
九、完整代码
完成代码如下:https://gitee.com/huan1993/netty-study/tree/master/src/main/java/com/huan/netty/protobuf
相关推荐
springboot集成netty,使用protobuf作为数据交换格式,可以用于智能终端云端服务脚手架。
java快速上手的网络IO框架,基于netty, google protobuf 数据传输协议.zip
客户端与服务端通信,协议用protoBuf。maven项目,其中有startClient与startServer两个mainClass。不懂的可以留言
在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip 包括测试proto3.proto文件,自动protobuf编译.proto文件生成JAVA类
通过学习netty + protobuf 开发小项聊天程序的例子,可以掌握 netty 的开发 和 协议栈的设计等
写了一个简单的netty server和client,传输协议是google protobuf。上传文件主要包括源码以及转换proto文件的工具.
通信与协议Netty+Protobuf-游戏设计与开发(1)配套代码
netty服务器源代码,通信协议使用protobuf
基于Netty+TCP+Protobuf实现的Android IM库,包含Protobuf序列化、TCP拆包与粘包、长连接握手认证、心跳机制、断线重连机制、消息重发机制、读写超时机制、离线消息、线程池等功能
车硕一个轻量级的游戏服务器框架,基于Springboot和Netty,使用Protobuf作为客户端和服务器之间的通信。功能逐步完善中... :face_blowing_a_kiss:目前支持协议: TCP协议Websocket项目依赖:名称为链接JDK1.8 Java...
在上一篇文章中,并且完成了Netty 和 Protobuf协议整合实战。具体的文章为: Netty+Protobuf 整合一:实战案例,带源码 并且,专门开出一篇文章,介绍了通讯消息数据包的几条设计准则。具体的文章为: Netty +...
15_Protobuf集成Netty与多协议消息传递;16_Protobuf多协议消息支援与工程最佳实践;17_Protobuf使用最佳实践与Apache Thrift介绍;18_Apache Thrift应用详解与实例剖析;19_Apache Thrift原理与架构解析;20_通过...
15_Protobuf集成Netty与多协议消息传递 16_Protobuf多协议消息支援与工程最佳实践 17_Protobuf使用最佳实践与Apache Thrift介绍 18_Apache Thrift应用详解与实例剖析 19_Apache Thrift原理与架构解析 20_通过Apache ...
第15讲:Protobuf集成Netty与多协议消息传递 第16讲:Protobuf多协议消息支援与工程最佳实践 第17讲:Protobuf使用最佳实践与Apache Thrift介绍 第18讲:Apache Thrift应用详解与实例剖析 第19讲:Apache Thrift...
- 使用netty4.X实现的手机游戏服务器,支持tcp,udp,http,websocket链接,采用protobuf自定义协议栈进行网络通信,支持rpc远程调用,使用mybatis3支持db存储分库分表,支持异步mysql存储,db保存时同步更新reids缓存。...
在IM即时通讯项目上,用到的消息处理机制Netty。Netty框架,TCP长连接,心跳,阻塞消息队列,线程池处理消息发送, 基于Google ProtoBuf自定义的消息协议。
14_Protobuf完整实例详解 15_Protobuf集成Netty与多协议消息传 递 16_Protobuf多协议消息支援与工程最佳实践 17_Protobuf使用最佳实践与Apache Thrift介绍 18_Apache Thrift应用详解与实例剖析 19_Apache Thrift原理...
1、netty宏观理解 2、netty的scoket编程详解 3、netty多客户端连接与通信 4、netty读写检测机制与长连接要素 5、protobuf多协议消息支援与工程最佳实践 6、grpc通信示例与数据通信详解 .........
3.netty支持的各协议,包含messagepack、protobuf以及私有协议 - netty-protocol 4.netty开发httpserver服务 - netty-httpserver 5.netty开发websocket服务 - netty-websocket 6.netty生产级心跳学习,利用...
3.日志消息队列使用多线程消费,当到达警告数量,持久化部分日志到本地,定时扫描本地日志目录,将日志载入到消息队列中 3.存储端可自定义可配置,使用MongoDb可应对实时性的高并发写入需求 4.使用Zookeeper搭建...