首頁/ 汽車/ 正文

Netty 使用 Google 的 ProtoBuf 來傳輸資料

使用 Google 的 ProtoBuf 來傳輸資料

ProtoBuf 是 google 的一個檔案傳輸的協議。與

平臺

語言

無關。

編寫 proto 檔案 (用來生成 Java 檔案 pojo 的)

syntax = “proto3”; // 表示協議的版本option java_outer_classname = “StudentPojo”; // 類名同時也是檔名字// ProtoBuf 是以 message 來管理資料的message Student {// 會在 java_outer_classname 的類中生成的內部類,他是真正的 傳輸的 pojo 物件 int32 id = 1; // int32 => proto 型別,對應 Java 的 int 型別。(Student 內有一個屬性 id,型別為 int32, 1 代表屬性序號,並不是值) string name = 2;}

生成 Java 的實體類

protoc。exe ——java_out=。 Student。proto

,執行這個命令以後就會生成一個指定的 Java 檔案。然後把這個檔案 copy 到自己的專案的工作路徑。

使用 Netty 來實現 ProtoBuf 的資料傳輸

引入 maven 的依賴

<!—— protoBuf ——> com。google。protobuf protobuf-java 3。21。5

新增 ProtoBuf 處理器到 server 和 client

pipeline。addLast(new ProtobufEncoder()); // ProtoBuf 的編碼器pipeline。addLast(new ProtobufDecoder(StudentPojo。Student。getDefaultInstance())); // ProtoBuf 的解碼器

傳送訊息進行通訊

StudentPojo。Student student = StudentPojo。Student。newBuilder()。setId(4)。setName(“孫悟空”)。build();log。info(“傳送的資料 => {}”, student);ctx。writeAndFlush(student);

這樣就是 netty 使用 ProtoBuf 的關鍵程式碼。

完整程式碼

服務端

package com。netty。codec;import com。utils。LoggerUtils;import io。netty。bootstrap。ServerBootstrap;import io。netty。channel。*;import io。netty。channel。nio。NioEventLoopGroup;import io。netty。channel。socket。SocketChannel;import io。netty。channel。socket。nio。NioServerSocketChannel;import io。netty。handler。codec。protobuf。ProtobufDecoder;import io。netty。handler。codec。protobuf。ProtobufEncoder;import org。slf4j。Logger;public class GoogleProtobufCodecServer { /** * 初始化服務 */ public void init() throws InterruptedException { EventLoopGroup boosGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); ChannelFuture channelFuture = serverBootstrap 。group(boosGroup, workGroup) 。channel(NioServerSocketChannel。class) 。childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { // 新增處理器 ChannelPipeline pipeline = ch。pipeline(); // ProtoBuf 編解碼器 pipeline。addLast(new ProtobufEncoder()); pipeline。addLast(new ProtobufDecoder(StudentPojo。Student。getDefaultInstance())); // 自定義處理器 pipeline。addLast(new ProtoBufHandler()); } }) // 繫結埠 。bind(6666)。sync(); channelFuture。channel()。closeFuture()。sync(); } finally { boosGroup。shutdownGracefully(); workGroup。shutdownGracefully(); } } /** * 自定義處理器 * * @author L */ private static class ProtoBufHandler extends SimpleChannelInboundHandler { Logger log = LoggerUtils。getLogger(ProtoBufHandler。class); /** * 通道初始化完成以後 */ @Override public void channelActive(ChannelHandlerContext ctx) { StudentPojo。Student student = StudentPojo。Student。newBuilder()。setId(4)。setName(“孫悟空”)。build(); log。info(“傳送的資料 => {}”, student); ctx。writeAndFlush(student); } /** * 接收到訊息以後 * * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler} * belongs to * @param msg the message to handle */ @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPojo。Student msg) { log。info(“客戶端傳送的資料 => id={},name={}”, msg。getId(), msg。getId()); } } /** * 程式碼允許 */ public static void main(String[] args) throws InterruptedException { new GoogleProtobufCodecServer()。init(); }}

客戶端

package com。netty。codec;import io。netty。bootstrap。Bootstrap;import io。netty。channel。*;import io。netty。channel。nio。NioEventLoopGroup;import io。netty。channel。socket。nio。NioSocketChannel;import io。netty。handler。codec。protobuf。ProtobufDecoder;import io。netty。handler。codec。protobuf。ProtobufEncoder;import org。slf4j。Logger;import org。slf4j。LoggerFactory;public class GoogleProtobufCodecClient { Logger log = LoggerFactory。getLogger(GoogleProtobufCodecClient。class); public void init() throws InterruptedException { EventLoopGroup clientGroup = new NioEventLoopGroup(); try { // 建立一個 bootstrap 而不是 serverBootstrap Bootstrap bootstrap = new Bootstrap(); // 設定相關管引數 bootstrap // 設定執行緒組 。group(clientGroup) // 設定客戶端通道的實現類(反射) 。channel(NioSocketChannel。class) // 設定處理器 。handler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) { // 新增自己的 handler 處理器 ChannelPipeline pipeline = nioSocketChannel。pipeline(); pipeline。addLast(new ProtobufEncoder()); pipeline。addLast(new ProtobufDecoder(StudentPojo。Student。getDefaultInstance())); pipeline。addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log。info(“服務端訊息 => {}”, msg); } }); } }); log。info(“客戶端準備 OK”); // 啟動客戶端去連結服務端,涉及到 netty 的非同步模型 ChannelFuture channelFuture = bootstrap。connect(“127。0。0。1”, 6666)。sync(); // 給通道關閉進行監聽 channelFuture。channel()。closeFuture()。sync(); } finally { clientGroup。shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GoogleProtobufCodecClient()。init(); }}

相關文章

頂部