差不多有一个多月没有更新了,因为这段时间在做一件事情:学习netty,当然并没有学完现在相当于是一个学习前的一个前序。我觉得是学习netty之前必须要先了解的底层知识点。一下就是我的一些学习记录。后续还会继续更新。
记录
- jdk动态代理为什么要实现接口:因为底层jdk动态代理生成的代理类需要继承Proxy类来实现,而java只能单继承 所以需要实现接口才能达到目的(字节码实现)
- mysql 数据库的底层数据结构分析
- 底层数据结构采用btree的变种 B+Tree实现 mysql的表引擎分为 mysiam(离散) 和 innodb(聚集)
- innodb创建时为什么一定需要主键:因为innodb索引和数据是在一起的,而在创建之初 默认是采用B+Tree的结构来存储数据,如果没 有主键会报错
- 联合索引最左原则
- 负载均衡算法
- 随机
- 加权
- 平滑加权
Netty
学习记录
-
git常用命令需要学习
-
Netty大致的执行流程
- 创建两个NioEventLoopGroup线程组 bossGroup(接收请求) workerGroup(处理请求)
- 创建ServerBootstrap用于启动服务类 并将两个线程组放入group中
- 定义NioServerSocketChannel 通道(里面通过反射生成实例)
- 构造我们的子处理器childHandler
- 子处理器继承ChannelInitializer<SocketChannel>
- 在pipeline加入HttpServerCodec(这个是对请求做编解码工作完成HttpRequestDecoder, HttpResponseEncoder 所做的事情)
- 并在pipeline管道中加入 自定义的处理器并继承SimpleChannelInboundHandler<HttpObject>
- 重写channelRead0方法 这里主要工作是 读取客户端发过来的请求 并返回响应
- 后面主要是对返回内容的解析并包装
- 启动绑定端口
- 最后优雅关闭
-
Netty sockte编程
- 简易聊天室的编程 主要是对应 handler的对应事件处理方法做加深
- channelRead0 收到客户端任何一个消息时 这里得到调用
- handlerAdded 服务端与客户端建立连接 触发事件
- handlerRemoved 客户端与服务器连接断开 触发事件
- channelActive 表示连接处于活动状态 触发事件
- channelInactive 表示连接不处于 活动状态 触发事件
- exceptionCaught 触发异常情况 触发的事件
- 空闲事件监听 IdleStateHandler(可以用作心跳检测)
- 读空闲
- 写空闲
- 读写空闲
- 简易聊天室的编程 主要是对应 handler的对应事件处理方法做加深
-
WebSockte实现与分析
- 解决http1.0 1.1产生的问题
- http的无状态问题(http虽然可以采用cookie session来解决)
- http是基于请求和响应模式的(一定是先有客户端先发请求)
- http1.0协议 一次请求响应之后会断开 http1.1 可以通过keepActlive设置(需要重复建立新的连接)
- 轮询的方式 可以解决长连接的问题(产生的问题:消息时效性问题、资源网络带宽浪费、每次轮询包含大量无效请求信息)
- webSockte 可以建立客户端与服务器端真正的长连接,一旦建立服务器端和客户端就是一个平等关系(双向数据传递)
- webSockte 是构建在http协议之上的协议 (属于html5规范的一部分)
- 需要服务器端支持
- Netty实现WebSockte
-
大部分的和以前的实现其实是一样的主要有以下注意:
/ 这里因为webSockte 本身也是基于http 所以需要加上http的编解码器 pipeline.addLast(new HttpServerCodec()); // 以块的方式来写的处理器 pipeline.addLast(new ChunkedWriteHandler()); // http 聚合处理器 netty对http请求采取分段的方式(特别重要的处理器) pipeline.addLast(new HttpObjectAggregator(8192)); // 处理http webSockte处理器(用于处理webSockte的握手以及processing of control frames (Close, Ping, Pong)) // 注意这里的数据是以 frames 方式来传递 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); ** * 需要注意 这里由于是处理 TextFrame 所以泛型中需要加入 TextWebSocketFrame */ public class MyWebSockteServerTextFramesHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("收到的消息:"+msg.text()); // 注意由于我们传递的是TextWebSocketFrame 如果用普通的字符串 handler是无法解析的 ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:"+ LocalDate.now())); }
-
客户端主要采用js实现
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>WebSockte客户端</title> </head> <body> <script type="text/javascript"> var sockte; if(window.WebSocket){ sockte = new WebSocket("ws://localhost:8899/ws"); sockte.onmessage = function (ev) { var ta = document.getElementById("responseText"); ta.value = ta.value + "\n" + ev.data; } sockte.onopen = function (ev) { var ta = document.getElementById("responseText"); ta.value = "连接开启"; } sockte.onclose = function (ev) { var ta = document.getElementById("responseText"); ta.value = ta,value + "\n" + "连接关闭"; } } else { alert('浏览器不支持'); } function send(msg) { if(!window.WebSocket){ return; } if(sockte.readyState == WebSocket.OPEN){ sockte.send(msg); }else{ alert("连接尚未开启"); } } </script> <form onsubmit="return false"> <textarea name="message" style="width: 400px;height: 200px"></textarea> <input type="button" value="发送数据" onclick="send(this.form.message.value)"> <h3>服务端输出:</h3> <textarea id="responseText" style="width: 400px;height: 300px"></textarea> <input type="button" onclick="javascript:document.getElementById('responseText').value='' " value="清空内容"> </form> </body> </html>
-
- 解决http1.0 1.1产生的问题
-
google protobuf 使用方式分析 (推荐 Effective Java 这本书)
自定义协议 体积更小的 序列化与反序列化
-
RPC库使用原理 :RMI(remote method invocation 远程方法调用)
- client:(序列化字节码传输给server)
- server: (反序列化--一系列业务逻辑--序列化传输给client)
- 代码生成的概念 client==> stub(桩) server==> skeleton(骨架)
- 序列化(对象转字节)与反序列化(字节转对象) RPC的重要机制 也叫做编码与解码
- RPC:Remote Procedure Call(远程过程调用)(大部分RPC框架是跨语言)(使用步骤:)
- 定义接口说明文件:描述对象(结构体)、对象成员、接口方法等一系列信息
- 通过RPC框架所提供的编译器,将接口说明文件编译成具体语言文件
- 在客户端与服务器端分别引入RPC编译器所生成的文件,即可像调用本地方法一样调用远程方法
- webService广义上来讲也是一种RPC(与真正的RPC的区别)
- 编解码效率
- 传输压缩比
- 传输方式RPC一般是基于sockte传输 webService一般是基于http
-
引入 google protobuf
- 需要安装 protobuf编译器并配置环境变量 https://github.com/protocolbuffers/protobuf/releases
- 引入相应的jar
- 'com.google.protobuf:protobuf-java:3.8.0',
- 'com.google.protobuf:protobuf-java-util:3.8.0'
-
protobuf可以解决的问题
- 本身java自带的序列化机制只是针对java而言如果其他的语言是实现不了的
- 对于简单的数据我们可以自定义编码格式,但是针对运行时解码对性能有一定的影响
- 采用xml的方式 虽然xml标记语言是人类可读的并且各种语言针对xml都有对应的解析库,但是xml是非常占据空间的并且对于xml的变量dom树也是非常耗费性能的
-
关于 .proto文件解析说明
syntax = "proto2"; // 默认包 package tutorial; // 如果显示的定义了java_package 那么就以定义的为准 option java_package = "com.example.tutorial"; // java_outer_classname 定义生成文件的类名 如果没有定义就以 文件名转驼峰的方式:my_proto.proto=>MyProto option java_outer_classname = "AddressBookProtos"; message Person { // 这里的 “=1、=2“ 这里并不是赋值而是一种标记 表示在二进制编码中解码中 // 1-15可以用在 标记常用的字符 因为这里会少一个字节表示 16以及之后的可以用作不常用的做标记 这样可以提高性能 // 这里需要注意标记 与之对应的同一层次上的是不能重复的 // required 表示这个必须提供的 否则在初始化的时候将抛出RuntimeException 在解析的时候将抛出IOException required string name = 1; required int32 id = 2; // optional表示可以不提供的 如果没有将取设置的默认值 如:optional PhoneType type = 2 [default = HOME]; // 否则 zero for numeric types, the empty string for strings, false for bools //**注意:在使用required时我们应该非常小心 因为一旦后面这个字段变成不必要时 前面定义将会出现不兼容问题 optional string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; } message PhoneNumber { required string number = 1; optional PhoneType type = 2 [default = HOME]; } // repeated 表示可以重复出现 也就是说这是一个list repeated PhoneNumber phones = 4; } message AddressBook { repeated Person people = 1; }
如果对表量消息类型还有不理解的可以参考这个地址:https://developers.google.com/protocol-buffers/docs/proto
-
protobuf 实战
- proto文件(这里先贴代码 后续需要注意的地方将后面写出)
syntax = "proto2"; package com.lc.protobuf; option optimize_for = SPEED; option java_package = "com.lc.netty.sixthexample"; option java_outer_classname = "MyMessage"; message MyMessageInfo{ enum DataType{ Person = 1; Dog = 2; Cat = 3; } required DataType date_type = 1; //oneof的意思:如果有多个可选字段,在某一个时刻只能只有一个值被设置,可以节省内存空间 oneof dataBody{ Person person = 2; Cat cat = 3; Dog dog = 4; } } message Person{ required string name = 1; optional int32 age = 2; optional string address = 3; } message Cat{ required string name = 1; optional int32 age = 2; } message Dog{ required string name = 1; optional int32 age = 2; }
-
服务端
- 编写初始化服务端(由于基本上是差不多的 代码就不贴了)
- 编写 initiailzer pipline初始化文件(说明一下几个处理器的作用)
- ProtobufEncoder:用于对Probuf类型序列化。
- ProtobufVarint32LengthFieldPrepender:用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
- ProtobufVarint32FrameDecoder:用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
- ProtobufDecoder:反序列化指定的Probuf字节数组为protobuf类型
- MyProtoBufServerHandler是我们自己需要编写的handler
public class MyProtoBufInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); // 解码器 将字节数组转化成真正的对象 pipeline.addLast(new ProtobufDecoder(MyMessage.MyMessageInfo.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new MyProtoBufServerHandler()); } }
- 编写对应的handler
- 这里这样写的原因是因为netty默认只是支持一种协议 这里采用将 实体数据加一个描述字段来对类型做区分以用不同的处理方式
public class MyProtoBufServerHandler extends SimpleChannelInboundHandler<MyMessage.MyMessageInfo> { @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessage.MyMessageInfo msg) throws Exception { System.out.println("收到客户端的消息:"); switch (msg.getDateType()){ case Cat: System.out.println(msg.getCat().getAge()); System.out.println(msg.getCat().getName()); break; case Dog: System.out.println(msg.getDog().getAge()); System.out.println(msg.getDog().getName()); break; case Person: System.out.println(msg.getPerson().getAddress()); System.out.println(msg.getPerson().getAge()); System.out.println(msg.getPerson().getName()); break; default: } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端由于是基本上一样的所以不贴代码了
-
Netty 传输和springMVC的区别、
- 使用springMVC 通过@GetMapping(...="/users/user/123") @PostMapping(...="/users/user/")
- springMvc 通过DisPatcherServlet 控制器 分发到不同的controller
- Netty本质上其实也是一样 只不过netty没有框架的支持路由分发
- 使用springMVC 通过@GetMapping(...="/users/user/123") @PostMapping(...="/users/user/")
-
使用 Netty+protobuf 如何进行合理的工程创建(前提:使用Git作为版本控制)
- git submodule(是什么):git仓库里的一个仓库
- ServerProject:项目开发的工程
- Protobuf-java:生成protojava文件工程
- data.proto 源代码文件
- ClientProject:项目开发的工程
- 通过gitsubmodule方式将Protobuf-java引入到ServerProject工程中
- 这种方式的缺点
- git的会采用分支的方式进行 分支切换没有跟上
- git subtree(思路和 git submodule一样 只是只有一个仓库)
- git submodule(是什么):git仓库里的一个仓库
-
Apache Thrift
-
Thrift传输格式
- TBinaryProtocol 二进制格式
- TCompactProtocol 压缩格式
- TJSONProtocol JSON格式
- TSimpleJSONProtocol 提供JSON只写协议 生成的文件很容易通过脚本语言解析
- TDebugProtocol 使用易懂的可读文本格式 以便于debug
-
Thrift 数据传输方式
- TSocket 阻塞式socket
- TFramedTransport 以frame为单位进行传输,非阻塞式服务中使用
- TFileTransport 以文件形式进行传输
- TMemoryTransport 将内存用于I/O java实现时内部实际使用简单的ByteArrayOutputStream
- TZlibTransport 使用zlib进行压缩,与其他的传输方式联合使用 当前无java实现
-
Thrift 支持的服务模型
- TSimpleServer 简单的单线程服务模型,常用与测试
- TThreadPoolServer 多线程服务模型,使用标准的阻塞式IO
- TNonblockingServer 多线程服务模型,使用非阻塞式IO(需要使用TFramedTransport数据传输方式)
- THsHaServer THsHa引入了线程池去处理,其模型吧读写任务放到线程池去处理;Half-aysnc是在处理IO事件上(accept/read/write io),Half-sync用于handler对rpc的同步处理
-
使用:
- IDL文件
namespace java thrift.generated typedef i16 short typedef i32 int typedef i64 long typedef bool boolean typedef string String struct Person { 1: optional String username, 2: optional int age, 3: optional boolean married } exception DataException { 1: optional String message, 2: optional String callStack, 3: optional String date } service PersonService { Person getPersonByUsername(1: required String username) throws(1: DataException dataException), void savePerson(1: required Person person) throws(1:DataException dataException) }
- 实现类
package com.lc.thrift; import org.apache.thrift.TException; import thrift.generated.DataException; import thrift.generated.Person; import thrift.generated.PersonService; public class ServicePersonImpl implements PersonService.Iface { @Override public Person getPersonByUsername(String username) throws DataException, TException { System.out.println("Go Client:"+username); Person person = new Person(); person.setUsername(username); person.setAge(10); person.setMarried(false); return person; } @Override public void savePerson(Person person) throws DataException, TException { System.out.println("Go client params:"); System.out.printf(person.getUsername()); System.out.println(person.getAge()); System.out.println(person.isMarried()); } }
- 服务端
package com.lc.thrift; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import thrift.generated.PersonService; public class ThriftServer { public static void main(String[] args) throws Exception{ TNonblockingServerSocket socket = new TNonblockingServerSocket(8899); THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4); PersonService.Processor<ServicePersonImpl> processor = new PersonService.Processor<>(new ServicePersonImpl()); // 协议工厂 压缩 arg.protocolFactory(new TCompactProtocol.Factory()); // 传输层 arg.transportFactory(new TFramedTransport.Factory()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new THsHaServer(arg); System.out.println("Thrift Server Started"); server.serve(); } }
- 客户端
package com.lc.thrift; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import thrift.generated.Person; import thrift.generated.PersonService; public class ThriftClient { public static void main(String[] args) { TTransport transport = new TFramedTransport(new TSocket("localhost",8899),600); TProtocol protocol = new TCompactProtocol(transport); PersonService.Client client = new PersonService.Client(protocol); try{ transport.open(); Person person = client.getPersonByUsername("张三"); System.out.println(person.getUsername()); System.out.println(person.getAge()); System.out.println(person.isMarried()); System.out.println("=============="); Person person1 = new Person(); person1.setUsername("李四"); person1.setAge(10); person1.setMarried(true); client.savePerson(person1); }catch (Exception e){ throw new RuntimeException(e.getMessage(),e); }finally { transport.close(); } } }
-
-
gradlew介绍
- gradlew => gradle wrapper
- 目的
本地没有安装gradle的情况下依然可以通过一条很简单的命令就可以构建项目
使用gradlew clean build构建
gradle wrapper 第一次构建会生成几个重要的文件 gradlew、gradlew.bat 以及一个gradle文件
-
我们可以在 build.gradle自定义配置设置属性
task wrapper (type:Wrapper){ gradleVersion = '3.x' // 版本号 distributionType ='all' //有all bin ..... }
- vi命令的小技巧
- set nu 打开显示行
- Ctrl + F 向上翻屏
- Ctrl + B 向下翻屏
- Shift + A 到行最后
-
gRPC
grpc和thrift之间有一些区别,不管在消息文件的编写还是 对消息的实现
-
grpc的消息编写
syntax = "proto3"; package com.lc.proto; option java_package = "com.lc.proto"; option java_outer_classname = "StudentProto"; option java_multiple_files = true; service StudentService { // 客户端 发出一个请求 服务器端返回一个响应 rpc GetRealNameByUsername(MyRequest) returns (MyResponse){} // 客户端 发出一个请求 服务器端返回一个流式的响应 rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse){} // 客户端 发出一个流式请求 服务器返回一个响应List rpc GetStudentWrapperByAges(stream StudentRequest) returns (StudentResponseList){} // 客户端 发出一个流式请求 服务器响应一个流式请求 rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {} } message MyRequest { string username = 1; } message MyResponse { string realname = 2; } message StudentRequest{ int32 age = 1; } message StudentResponse { string name = 1; int32 age = 2; string city = 3; } message StudentResponseList { repeated StudentResponse studentResponse = 1; } message StreamRequest { string request_info = 1; } message StreamResponse { string response_info = 1; }
-
服务端实现
package com.lc.grpc; import io.grpc.Server; import io.grpc.ServerBuilder; import java.io.IOException; public class GrpcServer { private Server server; private void start() throws IOException { // 初始化server对象 this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start(); System.out.println(" server started!"); // 回调钩子 // Runtime 运行时对象 可以获取有关环境相关信息 Runtime.getRuntime().addShutdownHook(new Thread(()->{ System.out.println("关闭jvm"); GrpcServer.this.stop();// 关闭sockte })); System.out.println("执行到这里"); } private void stop(){ if(null != this.server){ this.server.shutdown(); } } private void awaitTermination() throws InterruptedException { if (null != this.server) { this.server.awaitTermination(); } } public static void main(String[] args) throws InterruptedException, IOException { GrpcServer server = new GrpcServer(); server.start(); server.awaitTermination(); } }
-
service实现类的实现
package com.lc.grpc; import com.lc.proto.*; import io.grpc.stub.StreamObserver; public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase { @Override public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) { System.out.println("接收到客户端的请求:"+request.getUsername()); responseObserver.onNext(MyResponse.newBuilder().setRealname("张三").build()); responseObserver.onCompleted(); } @Override public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) { System.out.println("接收到客户端的请求:"+request.getAge()); responseObserver.onNext(StudentResponse.newBuilder().setAge(10).setCity("北京").setName("张三").build()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } responseObserver.onNext(StudentResponse.newBuilder().setAge(10).setCity("南京").setName("李四").build()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } responseObserver.onNext(StudentResponse.newBuilder().setAge(10).setCity("上海").setName("王五").build()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } responseObserver.onNext(StudentResponse.newBuilder().setAge(10).setCity("深圳").setName("杨柳").build()); responseObserver.onCompleted(); } // 客户端发送一个流 服务端返回一个响应 @Override public StreamObserver<StudentRequest> getStudentWrapperByAges(StreamObserver<StudentResponseList> responseObserver) { System.out.println("接收到客户端的流式请求:----------"); return new StreamObserver<StudentRequest>() { @Override public void onNext(StudentRequest value) { System.out.println("接收到客户端的请求:"+value.getAge()); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { StudentResponseList build = StudentResponseList.newBuilder().build(); StudentResponseList build1 = StudentResponseList.newBuilder() .addStudentResponse(StudentResponse.newBuilder().setName("张三").setCity("南京").setAge(10).build()) .addStudentResponse(StudentResponse.newBuilder().setName("李四").setCity("北京").setAge(10).build()) .addStudentResponse(StudentResponse.newBuilder().setName("王五").setCity("上海").setAge(10).build()) .build(); responseObserver.onNext(build1); responseObserver.onCompleted(); } }; } @Override public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) { return new StreamObserver<StreamRequest>() { @Override public void onNext(StreamRequest value) { System.out.println("onNext:"+value.getRequestInfo()); responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo("你好:"+value.getRequestInfo()).build()); } @Override public void onError(Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; } }
-
客户端实现
package com.lc.grpc; import com.lc.proto.*; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.time.LocalDateTime; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class GrpcClient { public static void main(String[] args) { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost",8899) .usePlaintext(true) //普通文本的连接方式 .build(); StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(channel); StudentServiceGrpc.StudentServiceStub studentServiceFutureStub = StudentServiceGrpc.newStub(channel); /* MyResponse my = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("lisi").build()); System.out.println(my.getRealname()); System.out.println("----------------------------"); Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(10).build()); while (iter.hasNext()){ StudentResponse next = iter.next(); System.out.println(next.getAge()+":"+next.getCity()+":"+next.getName()); } System.out.println("-----------------------------"); System.out.println("客户端发送流到服务端"); StreamObserver<StudentResponseList> streamObserver = new StreamObserver<StudentResponseList>() { @Override public void onNext(StudentResponseList value) { System.out.println("接收到服务端的请求"); List<StudentResponse> studentResponseList = value.getStudentResponseList(); for (StudentResponse studentResponse : studentResponseList) { System.out.println(studentResponse.getAge()+":"+studentResponse.getCity()+":"+studentResponse.getName()); } } @Override public void onError(Throwable t) { System.out.printf(t.getMessage()); } @Override public void onCompleted() { System.out.println("------"); } }; StreamObserver<StudentRequest> studentWrapperByAges = studentServiceFutureStub.getStudentWrapperByAges(streamObserver); studentWrapperByAges.onNext(StudentRequest.newBuilder().setAge(10).build()); studentWrapperByAges.onNext(StudentRequest.newBuilder().setAge(20).build()); studentWrapperByAges.onNext(StudentRequest.newBuilder().setAge(30).build()); studentWrapperByAges.onCompleted(); */ StreamObserver<StreamResponse> streamResponseStreamObserver = new StreamObserver<StreamResponse>() { @Override public void onNext(StreamResponse value) { System.out.println("接收到请求:"); System.out.println(value.getResponseInfo()); System.out.println("**************"); } @Override public void onError(Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted() { System.out.println("---------end---------"); } }; StreamObserver<StreamRequest> streamRequestStreamObserver = studentServiceFutureStub.biTalk(streamResponseStreamObserver); for(int i=0;i<10;i++) { streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } streamRequestStreamObserver.onCompleted(); try { channel.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }
-
I/O 与NIO
-
java.io
- 最核心的概念(Stream)流,面向流的编程。在java中一个流 要么是输入流,要么是输出流
-
java.nio
最核心的概念有三个 Selector,Channel,Buffer。在java.nio中是面向块(Block)或者是缓冲区(Buffer)编程
buffer本身就是一块内存,底层实现上它实际上是一个数组。数据的读和写都是通过buffer实现的
除了数组之外 buffer还提供了数据的结构化访问方式,并且可以追踪到系统的读写过程
java中的8种原生数据类型都有对应的buffer类型 IntBuffer LongBuffer ....
channel 指的是可以向其读取或者写入数据的对象 java中类似于Stream
注意所有数据的读写都是通过buffer来进行的,永远不会直接向channle写入或者读取数据的情况
与Stream不同的是 channel是双向的,一个流只能是InputStream 或者OutputStream,Channel打开后则可以进行读取或者写入、读取操作。由于channel是双向的,因此他能更好的反应出操作系统的真实情况:在linux系统中,底层底层操作系统的通道就是双向的
-
演示用例:
public static void main(String[] args) { // 分配一个大小为10的缓冲区 里面只能放置整数 IntBuffer intBuffer = IntBuffer.allocate(10); // 生成随机数 for(int i=0;i<intBuffer.capacity();++i){ // SecureRandom 生成的随机数 更加具有随机性 int randomNumer = new SecureRandom().nextInt(20); intBuffer.put(randomNumer); } // 翻转 这里的作用是 在放入数据之后 如果需要进行读 则需要进行翻转 读写切换 intBuffer.flip(); // intBuffer 还有没有元素 while (intBuffer.hasRemaining()){ System.out.println(intBuffer.get()); } }
// 传统io 如何切换到nio public static void main(String[] args) throws Exception{ FileInputStream fileInputStream = new FileInputStream("niotest.txt"); FileChannel channel = fileInputStream.getChannel(); // 构造ByteBuffer对象并分配大小 ByteBuffer byteBuffer = ByteBuffer.allocate(512); // 将文件对象读入到 byteBuffer中 channel.read(byteBuffer); byteBuffer.flip(); while (byteBuffer.remaining() >0) { byte b = byteBuffer.get(); System.out.println("char:"+(char)b); } fileInputStream.close(); }
// 输出流 public static void main(String[] args) throws Exception{ try( FileOutputStream fileOutputStream = new FileOutputStream("niotest2.txt"); FileInputStream fileInputStream = new FileInputStream("niotest.txt"); ){ FileChannel channel1 = fileInputStream.getChannel(); FileChannel channel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(512); channel1.read(byteBuffer); byteBuffer.flip(); while (byteBuffer.hasRemaining()){ channel.write(byteBuffer); } } }
public static void main(String[] args) throws Exception { FileOutputStream fileOutputStream = new FileOutputStream("niotest3.txt"); FileChannel channel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(512); // 将数据写入buffer byte[] message = "Hello,message!".getBytes(); for(int i=0;i<message.length;i++){ byteBuffer.put(message[i]); } byteBuffer.flip(); channel.write(byteBuffer); fileOutputStream.close(); }
-
关于nio buffer中的三个重要的状态属性的含义:position limit capacity 一下是javadoc文档中的内容:
A container for data of a specific primitive type.
<p> A buffer is a linear, finite sequence of elements of a specific
primitive type. Aside from its content, the essential properties of a
buffer are its capacity, limit, and position: </p>
<blockquote>
<p> A buffer's <i>capacity</i> is the number of elements it contains.
capacity of a buffer is never negative and never changes. </p>
<p> A buffer's <i>limit</i> is the index of the first element that sho
not be read or written. A buffer's limit is never negative and is nev
greater than its capacity. </p>
<p> A buffer's <i>position</i> is the index of the next element to be
read or written. A buffer's position is never negative and is never
greater than its limit. </p>
</blockquote>
<p> There is one subclass of this class for each non-boolean primitive t
<h2> Transferring data </h2>
<p> Each subclass of this class defines two categories of <i>get</i> and
<i>put</i> operations: </p>
<blockquote>
<p> <i>Relative</i> operations read or write one or more elements star
at the current position and then increment the position by the number
elements transferred. If the requested transfer exceeds the limit the
relative <i>get</i> operation throws a {@link BufferUnderflowException
and a relative <i>put</i> operation throws a {@link
BufferOverflowException}; in either case, no data is transferred. </p
<p> <i>Absolute</i> operations take an explicit element index and do n
affect the position. Absolute <i>get</i> and <i>put</i> operations th
an {@link IndexOutOfBoundsException} if the index argument exceeds the
limit. </p>
</blockquote>
<p> Data may also, of course, be transferred in to or out of a buffer by
I/O operations of an appropriate channel, which are always relative to t
current position.
<h2> Marking and resetting </h2>
<p> A buffer's <i>mark</i> is the index to which its position will be re
when the {@link #reset reset} method is invoked. The mark is not always
defined, but when it is defined it is never negative and is never greate
than the position. If the mark is defined then it is discarded when the
position or the limit is adjusted to a value smaller than the mark. If
mark is not defined then invoking the {@link #reset reset} method causes
{@link InvalidMarkException} to be thrown.
<h2> Invariants </h2>
<p> The following invariant holds for the mark, position, limit, and
capacity values:
<blockquote>
<tt>0</tt> <tt><=</tt>
<i>mark</i> <tt><=</tt>
<i>position</i> <tt><=</tt>
<i>limit</i> <tt><=</tt>
<i>capacity</i>
</blockquote>
<p> A newly-created buffer always has a position of zero and a mark that
undefined. The initial limit may be zero, or it may be some other value
that depends upon the type of the buffer and the manner in which it is
constructed. Each element of a newly-allocated buffer is initialized
to zero.
<h2> Clearing, flipping, and rewinding </h2>
<p> In addition to methods for accessing the position, limit, and capaci
values and for marking and resetting, this class also defines the follow
operations upon buffers:
<ul>
<li><p> {@link #clear} makes a buffer ready for a new sequence of
channel-read or relative <i>put</i> operations: It sets the limit to t
capacity and the position to zero. </p></li>
<li><p> {@link #flip} makes a buffer ready for a new sequence of
channel-write or relative <i>get</i> operations: It sets the limit to
current position and then sets the position to zero. </p></li>
<li><p> {@link #rewind} makes a buffer ready for re-reading the data t
it already contains: It leaves the limit unchanged and sets the positi
to zero. </p></li>
</ul>
<h2> Read-only buffers </h2>
<p> Every buffer is readable, but not every buffer is writable. The
mutation methods of each buffer class are specified as <i>optional
operations</i> that will throw a {@link ReadOnlyBufferException} when
invoked upon a read-only buffer. A read-only buffer does not allow its
content to be changed, but its mark, position, and limit values are muta
Whether or not a buffer is read-only may be determined by invoking its
{@link #isReadOnly isReadOnly} method.
<h2> Thread safety </h2>
<p> Buffers are not safe for use by multiple concurrent threads. If a
buffer is to be used by more than one thread then access to the buffer
should be controlled by appropriate synchronization.
<h2> Invocation chaining </h2>
<p> Methods in this class that do not otherwise have a value to return a
specified to return the buffer upon which they are invoked. This allows
method invocations to be chained; for example, the sequence of statement
<blockquote><pre>
b.flip();
b.position(23);
b.limit(42);</pre></blockquote>
can be replaced by the single, more compact statement
<blockquote><pre>
b.flip().position(23).limit(42);</pre></blockquote> -
通过nio读取文件的3个步骤
- 从FileInputStream 获取 channel对象
- 创建buffer对象
- 将数据从channel读取到buffer中
-
绝对方法和相对方法的含义:
- 相对方法:limit和position值在操作时会被考虑到(随着读取或者是filp操作时 他们的值可能会发生相应的变化)
- 绝对方法:是完全忽略掉limit和position值(根据buffer的索引来直接get或者put)
-
割或者分片buffer(slice)左闭右开 (注意这里分片之后的数据 还是和原来的数据共享一份 修改数据也会影响原来的数据)
// 分割或者分片buffer public static void main(String[] args) { ByteBuffer byteBuffer = ByteBuffer.allocate(10); for (int i=0;i<byteBuffer.capacity();i++){ byteBuffer.put((byte) i); } byteBuffer.position(2); byteBuffer.limit(6); ByteBuffer slice = byteBuffer.slice(); for(int i=0;i<slice.capacity();i++){ byte b = slice.get(i); b *=2; slice.put(i,b); } byteBuffer.clear(); while (byteBuffer.hasRemaining()){ System.out.println(byteBuffer.get()); } }
只读buffer
-
对外内存(直接内存)
-
直接缓冲
public static void main(String[] args) throws Exception{ try( FileOutputStream fileOutputStream = new FileOutputStream("niotest2.txt"); FileInputStream fileInputStream = new FileInputStream("niotest.txt"); ){ FileChannel inputChannel = fileInputStream.getChannel(); FileChannel outputChannel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocateDirect(512); while (true){ byteBuffer.clear(); int read = inputChannel.read(byteBuffer); System.out.println("read:"+read); if(-1 == read){ break; } byteBuffer.flip(); outputChannel.write(byteBuffer); } } }
-
文件映射
// 内存映射文件 一个文件的内存映射区域 public static void main(String[] args) throws Exception { // rw 表示可读写 RandomAccessFile randomAccessFile = new RandomAccessFile("NIOTest9.txt","rw"); // 获取文件通道对象 FileChannel fileChannel = randomAccessFile.getChannel(); // 获取MappedBuffer对象 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5); // 通过MappedByteBuffer 修改文件信息 mappedByteBuffer.put(0,(byte)'a'); mappedByteBuffer.put(3,(byte)'b'); randomAccessFile.close(); }
-
文件锁
// 文件锁的概念 public static void main(String[] args) throws Exception { RandomAccessFile randomAccessFile = new RandomAccessFile("NIOTest9.txt","rw"); FileChannel fileChannel = randomAccessFile.getChannel(); FileLock lock = fileChannel.lock(2, 4, true); System.out.println(lock.isValid()); System.out.println(lock.isShared()); lock.release(); randomAccessFile.close(); }
-
-
关于buffer的Scattering(分散) 和Gathering(合并)
- Scattering:来自channel的数据可以按顺序读入一个buffer的数组之中 (按照第一个buffer读满接着往下一个读取)
- Gathering:可以将来自多个buffer的数据按顺序写入(和上面相反)
- 使用场景:自定义协议时 一个协议可能有 一个header还有一个标识体最后一个body,因为一般头部或者标识这里的字节长度一般是固定的 可变的是body:这里就可以采用三个buffer来接收,可以达到天然的区分数据,不用后期手动的去做拆分了。
-
网络编程
-
关于NIO编程实例:
public static void main(String[] args) throws Exception { int[] ports = new int[5]; ports[0] = 5000; ports[1] = 5001; ports[2] = 5002; ports[3] = 5003; ports[4] = 5004; // 构造一个selector Selector selector = Selector.open(); // 将 Selector 注册到对应的监听端口上 for(int i=0;i<ports.length;++i){ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 配置是否阻塞 true 阻塞 false 不阻塞 serverSocketChannel.configureBlocking(false); // 构造sockte ServerSocket serverSocket = serverSocketChannel.socket(); // 绑定 InetSocketAddress socketAddress = new InetSocketAddress(ports[i]); serverSocket.bind(socketAddress); // 注册 通道和选择器之间的关联关系 将当前的selector注册到通道上 并且对应的感兴趣的key是 接受连接 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("监听端口:"+ports[i]); } while (true) { // 表示返回的键的数量 int numbers = selector.select(); System.out.println("numbers:"+numbers); // 一旦有返回 获取相应的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("selectionKeys:"+selectionKeys); // 获取他的迭代器 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if( selectionKey.isAcceptable() ) { // 获取对应的 真正所关联的channel对象 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); // 获取对应的 SockteChannel 这时表示的是真正的连接对象通道 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); // 连接建立后之后 需要将新的连接注册到 selector当中 感兴趣的事件是 读 socketChannel.register(selector,SelectionKey.OP_READ); // 以上调用完之后 需要调用迭代器的 remove 将其从 selectionKeys 集合中移除(这里特别重要,不然一直还是会监听) iterator.remove(); System.out.println("获取到客户端的连接:"+socketChannel); } else if (selectionKey.isReadable()) { // 进行数据的读取 // 通过 selectionKey 获取对应的 channel SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); int byteRead = 0; while (true) { // 读取数据 ByteBuffer byteBuffer = ByteBuffer.allocate(512); int read = socketChannel.read(byteBuffer); if ( read <=0 ) { break; } // 往会写 byteBuffer.flip(); socketChannel.write(byteBuffer); byteRead += read; } System.out.println("读取的数据:"+byteRead+" 来自于:"+socketChannel); // 特别注意 这里处理完 一定要把当前的 事件rremove iterator.remove(); } } } }
-
实例2:
// 维护所有客户端的连接信息 private static Map<String,SocketChannel> clientMap = new HashMap(); public static void main(String[] args) throws Exception{ // 创建一个serverSockteChannel对象 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 配置成非阻塞的 serverSocketChannel.configureBlocking(false); // 获取serverSocket对象 ServerSocket serverSocket = serverSocketChannel.socket(); // 绑定端口 serverSocket.bind(new InetSocketAddress(8899)); // 创建selector对象 Selector selector = Selector.open(); // 将serverSocketChannel对象注册到selector上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册完成之后 进行相应的事件处理 // 进行服务器的监听 while (true) { try { // 这个方法将阻塞 一直到他所监听的 感兴趣的事件发生 返回他所关注的事件数量 // 当这个方法返回之后 就可以获取他的selectionKey 所构成的集合 selector.select(); // 获取返回的集合对象 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历SelectionKey集合 取出对应的每种SelectionKey 判断对应的是什么事件 并进行相应的处理 selectionKeys.forEach(selectionKey -> { try { // 表示对应客户端的 channel对象 final SocketChannel client; if (selectionKey.isAcceptable()) { // 可以通过 selectionKey 来获取与之关联的 serverSocketChannel对象 ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); // 服务端 真正接收了客户端的连接会返回一个 SocketChannel对象 client = server.accept(); // client 连接真正建立之后 将客户端的channel对象 配置成非阻塞的并 注册到selector上 client.configureBlocking(false); client.register(selector,SelectionKey.OP_READ); // 将客户端的连接信息 记录到服务端 这样才可以实现服务端实现消息的分发 String key = "["+ UUID.randomUUID().toString()+"]"; clientMap.put(key,client); }else if (selectionKey.isReadable()) { // 判断是否有新进来的数据 // 获取 socketChannel client = (SocketChannel) selectionKey.channel(); // 定义byteBuffer对象 ByteBuffer readBuffer = ByteBuffer.allocate(512); // 将数据读入 buffer int count = client.read(readBuffer); if(count > 0){ // 进行写操作 readBuffer.flip(); // 进行字符集编码 Charset charset = Charset.forName("utf-8"); // 将buffer对象进行解码成字符串 String receiveMessage = String.valueOf(charset.decode(readBuffer).array()); System.out.println(client + ":"+receiveMessage); // 获取到 sendKey String sendKey = null; for(Map.Entry<String,SocketChannel> entry : clientMap.entrySet()) { if ( client == entry.getValue()) { sendKey = entry.getKey(); break; } } // 进行分发 for (Map.Entry<String,SocketChannel> entry : clientMap.entrySet()) { // 获取每一个与服务端连接的 sockteChannel对象 SocketChannel socketChannel = entry.getValue(); // 进行数据的写入 ByteBuffer writeBuffer = ByteBuffer.allocate(512); // 将需要发送的数据 写入 writeBuffer.put((sendKey+":"+receiveMessage).getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); } } } } catch (Exception e ) { e.printStackTrace(); } }); selectionKeys.clear(); } catch (Exception e) { e.printStackTrace(); } } }
public static void main(String[] args) throws Exception{ try { // 建立连接 SocketChannel socketChannel = SocketChannel.open(); // 配置非阻塞模式 socketChannel.configureBlocking(false); // 定义 selector 并注册 Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); // 连接 socketChannel.connect(new InetSocketAddress("127.0.0.1",8899)); while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历 for (SelectionKey selectionKey : selectionKeys) { if (selectionKey.isConnectable()) { // 表示 连接上了 获取 socketChannel对象 SocketChannel client = (SocketChannel) selectionKey.channel(); // 连接是否处在是否进行的状态 if ( client.isConnectionPending()) { // 完成连接 client.finishConnect(); // 表示连接真正的建立好了 ByteBuffer wirteBuffer = ByteBuffer.allocate(512); wirteBuffer.put((LocalDateTime.now()+"连接成功").getBytes()); wirteBuffer.flip(); client.write(wirteBuffer); ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()); executorService.submit(()->{ while (true) { try { // 键盘输入 wirteBuffer.clear(); InputStreamReader inputStreamReader = new InputStreamReader(System.in); BufferedReader bufferedReader = new BufferedReader(inputStreamReader); String sendMessage = bufferedReader.readLine(); wirteBuffer.put(sendMessage.getBytes()); wirteBuffer.flip(); client.write(wirteBuffer); } catch (Exception e){ e.printStackTrace(); } } }); } // 注册读取事件 client.register(selector,SelectionKey.OP_READ); }else if( selectionKey.isReadable()){ SocketChannel client = (SocketChannel) selectionKey.channel(); // 读取服务器端发送过来的数据 ByteBuffer readBuffer = ByteBuffer.allocate(512); int read = client.read(readBuffer); if( read > 0) { String receivedMessage = new String(readBuffer.array(),0,read); System.out.println(receivedMessage); } } } selectionKeys.clear(); } } catch (Exception e) { e.printStackTrace(); } }
-
-
编解码
1. 实例:```java // java编解码 public static void main(String[] args) throws Exception{ // 定义一个输入文件 String inputFile = "NIOTest13_in.txt"; String outputFile = "NIOTest13_out.txt"; // 将 NIOTest13_in.txt文件内容拷贝到 NIOTest13_out中 使用内存映射 RandomAccessFile inputRandomAccessFile = new RandomAccessFile(inputFile,"r"); RandomAccessFile outputRandomAccessFile = new RandomAccessFile(outputFile,"rw"); // 获取输入文件的长度 long fileLong = new File(inputFile).length(); // 获取输入和输出的文件channle FileChannel inputChannel = inputRandomAccessFile.getChannel(); FileChannel outputChannel = outputRandomAccessFile.getChannel(); // 通过内存映射文件 修改内存内容 直接反应在文件上 MappedByteBuffer mappedByteBuffer = inputChannel.map(FileChannel.MapMode.READ_ONLY,0,fileLong); // 指定字符集 Charset charset = Charset.forName("utf-8"); // decoder 将字节数组转化成字符串 CharsetDecoder decoder = charset.newDecoder(); // encoder 将字符串转化成字节数组 CharsetEncoder encoder = charset.newEncoder(); // 将内存映射的buffer 解码成一个 charbuffer CharBuffer charBuffer = decoder.decode(mappedByteBuffer); // 将 charbuffer 编码成 bytebuffer ByteBuffer outputData = encoder.encode(charBuffer); // 将 outputData 输出到文件通道 outputChannel.write(outputData); // 关闭 inputRandomAccessFile.close(); outputRandomAccessFile.close(); } ```
-