I/O实际上是Input和Output,也就是输入和输出。而流量其实是一种抽象的概念,它表示的是数据的无结构化传递。会被当成无结构的字节序列或字符序列。流可以当作是磁盘与内存之间的一个管道。
在Java中I/O流操作很多,但是核心体系实际上就只有File(文件流)、InputStream(字节输入流)、OutputStream(字节输出流)、Reader(字符输入流)、Writer(字符输出流)。
File类是Java中为文件进行创建、删除、重命名、移动等操作而设计的一个类
public static void main(String[] args) {
File file = new File("D:\appdata\IODemo\Capture001.png");
try (
FileOutputStream fileOutputStream = new FileOutputStream("D:\appdata\IODemo\Capture002.png");
FileInputStream fileInputStream = new FileInputStream(file)) { // 1.7之后,将流写入try()中,代码执行完毕后,会自动关闭流
int len = 0;
byte[] buffer = new byte[1024];
long start = System.currentTimeMillis();
while ((len = fileInputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, len);
}
long end = System.currentTimeMillis();
System.out.println((end - start) / 1000);
} catch (IOException e) {
e.printStackTrace();
}
}
流程一定要关闭,否则当前线程没执行完会一直使其被进程占用。
try (FileReader reader = new FileReader("/appdata/IODemo/IODemo");
FileWriter writer = new FileWriter("/appdata/IODemo/IODemo.txt")) {
int i = 0;
char[] chars = new char[1];
while ((i = reader.read(chars)) != -1) {
writer.write(new String(chars, 0, i));
}
} catch (Exception e) {
e.printStackTrace();
}
缓冲流是带缓冲区的处理流,他会提供一个缓冲区,缓冲区的作用主要目的是:避免每次和硬盘打交道,能够提高输入/输出的执行效率。
BufferedInputStream
private static int DEFAULT_BUFFER_SIZE = 8192; // 默认8Kb的缓冲区
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; // 最大缓冲区大小
// 每次读取的8Kb size的字节会存储在buf[]数组中
//每次调用read()方法时,会首先去尝试从这个数组中读取,如果读取失败,会从数据源(磁盘上)去读取
protected volatile byte buf[];
// 两种构造方法最终调用该方法,带int参数的会覆盖默认的8Kb size
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
其实缓冲流原理上是帮我们封装了8Kb大小的数据,先从磁盘读8Kb到我们内存,后由我们自己去操作这8Kb的数据,当处理完8Kb缓冲区没有了,再加载数据到缓冲区,再读到内存去处理。当我们用普通流程去处理文件,将buffer[]设置得稍微大一点,一样可以达到提高效率的结果。
public static void main(String[] args) {
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/IODemo"));
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/IODemo.txt"))) {
int len = 0;
byte[] bytes = new byte[1024];
while ((len = bufferedInputStream.read(bytes)) != -1) {
// System.out.println(new String(bytes, 0, len));
bufferedOutputStream.write(bytes, 0, len);
bufferedOutputStream.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
将创建InputStream写入到try()中,可以帮我们实现close()关闭流的操作,这个close中包含了buffred的flush操作,如果没有关闭流,又没有手动flush(),将会丢失数据。
public void close() throws IOException {
try (OutputStream ostream = out) {
flush();
}
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("/appdata/IODemo/IODemo"), StandardCharsets.UTF_8))) {
String str;
while ((str = reader.readLine()) != null) {
System.out.println(str);
}
} catch (Exception e) {
e.printStackTrace();
}
try (InputStream inputStream = new FileInputStream("/appdata/IODemo/IODemo");
InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
char[] chars = new char[1024];
int i;
while ((i = reader.read(chars)) != -1) {
System.out.println(new String(chars, 0, i));
}
} catch (Exception e) {
e.printStackTrace();
}
在这个转换流中,是可以指定字符集编码的。
关于序列化和反序列化这个问题,我在18年参加工作的时候,遇到过一个项目,之后就再没有用过了。当时架构还是分布式dubbo+zookeeper,但是传输报文竟然用到这个我是没想到的。
什么是序列化和反序列化?
public class UserSerializable implements Serializable {
private static final long serialVersionUID = 8160464260217334369L;
private String name;
private int age;
public void setName(String name) {
this.name = name;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "UserSerializable{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
public static void main(String[] args) {
UserSerializable user = new UserSerializable();
user.setAge(26);
user.setName("Elian");
String fileName = "/appdata/IODemo/User";
try (FileInputStream fileInputStream = new FileInputStream(fileName);
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
ObjectOutputStream outputStream = new ObjectOutputStream(fileOutputStream);
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream)
) {
outputStream.writeObject(user);
outputStream.flush();
UserSerializable newUser = (UserSerializable) objectInputStream.readObject();
System.out.println(newUser);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class NIOFirstDemo {
public static void main(String[] args) {
bio();
bufferBio();
nio();
mmap();
zeroCopy();
}
private static void bio() {
try (FileInputStream bioInputStream = new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM");
FileOutputStream bioOutputStream = new FileOutputStream("/appdata/IODemo/jdk_bio.CHM")) {
// bio实现copy
long bioStart = System.currentTimeMillis();
int len = 0;
byte[] buffer = new byte[1024];
while ((len = bioInputStream.read(buffer)) != -1) {
bioOutputStream.write(buffer, 0, len);
}
bioOutputStream.flush();
System.out.println(System.currentTimeMillis() - bioStart);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void bufferBio() {
try (BufferedInputStream bioInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM"));
BufferedOutputStream bioOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/jdk_bufferBio.CHM"))) {
// bio实现copy
long bioStart = System.currentTimeMillis();
int len = 0;
byte[] buffer = new byte[1024];
while ((len = bioInputStream.read(buffer)) != -1) {
bioOutputStream.write(buffer, 0, len);
}
bioOutputStream.flush();
System.out.println(System.currentTimeMillis() - bioStart);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void nio() {
try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_nio.CHM"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
// nio 实现copy
long nioStart = System.currentTimeMillis();
int len = 0;
ByteBuffer buffer = ByteBuffer.allocate(1024);
while ((len = inChannel.read(buffer)) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
System.out.println(System.currentTimeMillis() - nioStart);
} catch (Exception e) {
e.printStackTrace();
}
}
// 依然将用户空间的
private static void mmap() {
try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdb_mmap.CHM"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
long mmapStart = System.currentTimeMillis();
MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
byte[] bytes = new byte[inMappedBuffer.limit()];
inMappedBuffer.get(bytes);
outMappedBuffer.put(bytes);
System.out.println("mmap:" + (System.currentTimeMillis() - mmapStart));
} catch (Exception e) {
e.printStackTrace();
}
}
private static void zeroCopy() {
try(FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_zeroCopy.CHM"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
long zeroCopyStart = System.currentTimeMillis();
inChannel.transferTo(0, inChannel.size(), outChannel);
System.out.println(System.currentTimeMillis() - zeroCopyStart);
} catch (Exception e) {
e.printStackTrace();
}
}
}
实验顺序(速度由快到慢排序)
zeroCopy(零拷贝) > mmap(内存映射) > bufferedInputStream > bio(基于channle) ~= nio
zerCopy无需将文件映射到内存,mmap会将buffer读进内存,关于Buffer继续往下看4.2。
// 服务端
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接
System.out.println("Client port:" + socket.getPort() + " has been connected!");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
String str = bufferedReader.readLine();
System.out.println("Client Content:" + str);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
writer.write(str + "
"); // 如果不换行,客户端会一直等待读取完
writer.flush();
bufferedReader.close();
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
try (Socket socket = new Socket("localhost", 9090)) {
OutputStream outputStream = socket.getOutputStream();
outputStream.write("Hello Elian
".getBytes(StandardCharsets.UTF_8)); // 不换行服务端会一直等待读取完,进入阻塞
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
} catch (Exception e) {
e.printStackTrace();
}
客户端是怎样找到目标服务的呢?
客户端发起请求的时候,在不同的层去增加不同的协议头,在数据链路层组装目标机器的Mac地址,这个地址是通过ARP协议,我们已知目标的IP,需要获得目标的Mac地址,会发送一个广播消息,会在网段内去询问这个IP是谁,目标地址会发送自己Mac地址给到当前这个发送端,就可以去组装目标的Mac地址。那么在数据发送过程中,进入IP广播后,某个网卡就会发现,对应Mac的网卡就会把数据包收进来。
本地磁盘IO通信:
网络磁盘通信:
两者不同在于:本地磁盘要通过DMA(直接存储访问器)将磁盘上的内容读取到内核空间缓冲区,再从内核空间缓冲区读到用户空间缓冲区进行操作。而网络IO是通过网卡中的缓冲区读取到系统内核缓冲区,如果应用进程一直没有调用socket的read()方法读取数据将数据copy到用户缓冲区,数据会一直被缓存在内核缓冲区里面。
accept()每次只能接收一个并处理一个socket,这样只能等上一个socket处理完才能继续处理下一个请求。BIO每次阻塞两个位置,第一个阻塞位置是accept过程,另一个阻塞过程是I/O流读写的过程。
解决办法:通过线程池进行处理。
public static void main(String[] args) {
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
while (true) {
Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接
executorService.submit(new ServerSocketThread(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
}
public class ServerSocketThread implements Runnable {
private Socket socket;
public ServerSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("Client port:" + socket.getPort() + " has been connected");
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter writer=new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())))){
String clinetStr = reader.readLine();
System.out.println("Client resived message: " + clinetStr);
Thread.sleep(15000);
writer.write("OK.
");
} catch(Exception e){
e.printStackTrace();
}
}
}
现在还有一个缺点:
线程数取决于计算机本身的线程数,但是线程数设置太大,又会造成线程之间切换造成的资源消耗。
RPC(Remote Procedure Call) 远程过程调用,是一种通过网络从计算机程序上请求服务,而不需要了解底层网络技术的协议。一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源。
// 1. 公共类
// 接口
public interface IHelloWorld {
String sayHello(String content);
}
// Request
public class RpcRequest implements Serializable {
private static final long serialVersionUID = -7922155162004878476L;
private String className;
private String methodName;
private Object[] parameters;
private Class[] types;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
public Class[] getTypes() {
return types;
}
public void setTypes(Class[] types) {
this.types = types;
}
}
// 2. provider
// impl
public class HelloWorldImpl implements IHelloWorld {
@Override
public String sayHello(String content) {
return "Hello " + content;
}
}
// Server
public class RpcProxyServer {
private final ExecutorService executorService = Executors.newCachedThreadPool();
public void publisher (int port) {
try (ServerSocket server = new ServerSocket(port)) {
while (true) {
final Socket socket = server.accept();
executorService.execute(new ProcessorHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProcessorHandler implements Runnable {
private final Socket socket;
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
RpcRequest request = (RpcRequest)objectInputStream.readObject();
Object object = invoke(request);
objectOutputStream.writeObject(object);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws Exception {
Class<?> clazz = Class.forName(request.getClassName());
Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
if (request.getClassName().substring(request.getClassName().lastIndexOf('.') + 1).equals("IHelloWorld"))
return method.invoke(new HelloWorldImpl(), request.getParameters());
else
return null;
}
}
// 3.consumer
// client
public class App
{
public static void main( String[] args )
{
RpcProxyClient client = new RpcProxyClient();
IHelloWorld iHelloWorld = client.clientProxy(IHelloWorld.class, "localhost", 9090);
System.out.println(iHelloWorld.sayHello("Elian"));
}
}
// Client
public class RpcProxyClient {
public T clientProxy(final Class interfaceCls, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
}
}
// 动态代理类
public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setTypes(method.getParameterTypes());
RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
Object object = rpcNetTransport.send(request);
return object;
}
}
// reader读取返回报文
public class RpcNetTransport {
private String host;
private int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Object send( RpcRequest request ) {
try (Socket socket = new Socket(host, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
objectOutputStream.writeObject(request);
objectOutputStream.flush();
return objectInputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
阻塞IO,非阻塞IO,IO复用,信号驱动IO,异步IO,无论哪种IO,都是为了能够提高服务端能够并行处理的连接数量。
应用进程调用accept()时,触发系统把数据从网卡缓冲区复制到内核空间,再从内核空间复制到用户空间,如果这个过程中,数据没有准备好,到数据返回或发生错误返回之前,用户进程一直处于阻塞状态,这个就是阻塞IO。
非阻塞是指用户进程调用accept()后,如果数据没有准备好,会返回一个EWOULDBLOCK状态,并创建一个线程出来不断轮询返回结果。由此可见会增加CPU的消耗。
单个进程可以同时处理多个客户端的网络IO链接,我们可以把所有链接过来的客户端注册到select/poll复用器上,用一个线程或者进程来调用这个select/poll,调用这个select的时候会阻塞,阻塞的时候,内核会去监视所有select/poll所负责的socket,当其中一个socket准备好的时候,那么这个select/poll就会返回,如果再次调用这个select的时候,就会把数据从内核拷贝到用户空间。
select/poll模型最大的缺点是,他只能线性的轮询1024个链接,当然这1024个链接只有少数处于活跃状态,会导致网络的延迟。jdk1.5之前的NIO是使用这种模型。
这种模型处理的情况是:多个不同的监听,而且只是提高了并发连接数,并不是提高单个线程处理性能。连接数少的情况下,不一定比BIO效率更高。
对select/poll进行的优化:
总结:
相比较老的IO来说,所有操作都是基于Channel和Buffer来说的,可以将Channel看成是InputStream/OutputStream,应用程序与磁盘/网络缓冲区之间的一个通道,而所有数据操作都是通过缓冲区来实现的。
通道(Channel):Java NIO数据来源,可以是网络,也可以是本地磁盘
缓冲区(Buffer):数据读写的中转区
选择器(selectors):异步IO的核心类,可以实现异步非阻塞IO,一个selectors可以管理多个通道Channel
FileChannle:从文件中读取数据
DatagramChannel:通过UDP协议读写网络中的数据
SocketChannel:通过TCP协议读写网络中的数据
ServerSocketChannel:监听一个TCP连接,对于每一个新的客户端连接都会创建一个SocketChannel
缓冲区本质上是一块可以写入的数据,以及从中读取数据的内存,实际上也是一个byte[]数据,只是在NIO中被封装成了NIO Buffer对象,并提供了一组方法来访问这个内存块,要理解buffer的工作原理,需要知道几个属性:
private int position = 0; // 下一个位置
private int limit; //
private int capacity; // 容量,buffer数组初始化的最大容量
private int mark; // 标记
MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
使用堆外内存的原因
使用堆外内存的问题
ByteBuffer模型
初始
read(), put()
position = n;
limit = capacity = 8;
mark = -1;
flip()
limit = position; // 用来设置限制
position = 0;
mark = -1;
mark()
mark = postion; // 标记
reset()
position = mark;
clear()实际上数据还在
position = 0;
limit = capacity;
mark = -1;
Linux支持的零拷贝方式:
server
public class ZeroCopyServer {
public static void main(String[] args) {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
RandomAccessFile writeFile = new RandomAccessFile("/appdata/IODemo/Capture001_zerCopy.png", "rw");
FileChannel fileChannel = writeFile.getChannel();
) {
long start = System.currentTimeMillis();
serverSocketChannel.bind(new InetSocketAddress(9090));
SocketChannel socketChannel = serverSocketChannel.accept();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int i = 0;
int j = 0;
/*while ((i = socketChannel.read(buffer)) != -1) {
buffer.flip();
fileChannel.map(FileChannel.MapMode.READ_WRITE, j, i ).put(buffer);
buffer.clear();
j += i;
}*/ // 2527ms mmap()方式
while ((i = socketChannel.read(buffer)) != -1) {
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
j += i;
} // 4462ms 普通写
System.out.println("传输大小:" + j + ";时间:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
e.printStackTrace();
}
}
}
client
public class ZeroCopyClient {
public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open();
FileChannel fileChannel = FileChannel.open(Paths.get("/appdata/IODemo/Capture001.png"))) {
socketChannel.connect(new InetSocketAddress("localhost", 9090));
int position = 0;
long size=fileChannel.size();
while (size > 0) {
long transfer = fileChannel.transferTo(position, fileChannel.size(), socketChannel); // 零拷贝,只从File Copy到缓冲区
position += transfer;
size -= transfer;
}
System.out.println("上传文件大小:" + position);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Selector(选择器,多路复用器)是Java NIO中能够检测一到多个NIO通道,是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
服务端处理过程:
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。
// selector.open();
private native int epollCreate();
// serverSocketChannel/socketChannle.register(selector, SelectionKey.OP_ACCEPT)
private native void epollCtl(int epfd, int opcode, int fd, int events);
// selector.select()
private native int epollwait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
客户端处理过程:
server
public class NIOSelectorServer {
public static void main(String[] args) {
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
serverSocketChannel.configureBlocking(false); // 在多路复用器中,这个必须设置为非阻塞
serverSocketChannel.bind(new InetSocketAddress(9090));
// 监听连接事件
// 将serverSocketChannel注册到selector上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 参数可以带时间:0:阻塞;有时间:设置一个超时时间
selector.select(); // 阻塞所有注册到多路复用器上的事件
Set selectionKeys = selector.selectedKeys(); // 对于连接的SocketChannel的selectKey的集合
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove(); // 避免重复处理
// socket两种状态:listen 通信R/W
if (selectionKey.isAcceptable()) { // 是一个连接事件
acceptHandler(selectionKey);
} else if (selectionKey.isReadable()) { // 是一个读事件
readHandler(selectionKey);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept(); // 目的是调用accept接收客户端,例如fd7
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.register(key.selector(), SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readHandler(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
try {
channel.read(buffer);
buffer.flip();
System.out.println("Client Info: "+new String(buffer.array()));
buffer.clear();
buffer.put("Hello Client, i'm Server".getBytes());
buffer.flip();
channel.write(buffer);
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
client
public class NIOSelectorClient {
public static void main(String[] args) {
try (Selector selector = Selector.open()) {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost", 9090));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
selector.select();
Set selectionKeySet = selector.selectedKeys();
Iterator selectionKeyIterator = selectionKeySet.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey selectionKey = selectionKeyIterator.next();
selectionKeyIterator.remove();
if (selectionKey.isConnectable()) {
connectHandler(selector, selectionKey);
} else if (selectionKey.isReadable()) {
readHandler(selectionKey);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void connectHandler(Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);
channel.write(ByteBuffer.wrap("Hello Server, I'm NIO Client".getBytes()));
channel.register(selector, SelectionKey.OP_READ);
}
private static void readHandler(SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
byteBuffer.flip();
System.out.println("client receive message: " + new String(byteBuffer.array()));
channel.close();
}
}
accpet() + new Thread(() -> {
// 业务处理
}).start();
new SocketServerChannel().registror(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
seletor.selectedKeys().iterator();
while (iterator.hasnext()) {
// 业务处理
}
}
在上面业务处理部分加入多线程。
Netty,两个Grop,一个处理accept,一个处理业务
留言与评论(共有 0 条评论) “” |