如何用Java与python代码解释IO模型

最近爷爷生病,我一直在陪床。陪床空隙,偶而研究一会资料。前天刚好看了点《UNIX网络编程》,比较头大。现在我来整理一下所学所得,并用于个人备忘。如果有不对,请批评。

想要解锁更多新姿势?请访问https://blog.tengshe789.tech/

IO模型介绍

IO模型是什么?很多书籍或者百度百度百科,都没有给出明确的解释,我也不敢乱下定义。以我愚见,IO模型,是通过根据前人主观意识的思考而构成客观阐述IO复杂操作逻辑的物件。

要知道,应用程序使用系统资源的一个过程,进程无法直接操作IO设备的,因为用户进程不能直接访问磁盘,所以要通过内核的系统调用读取,这个内核读取的过程就是用户进程等待的过程,等待内核读取后将数据从内核内存复制到进程内存。因此操作系统设立一个IO模型进行规范,就非常有必要了。

应用程序使用系统资源

为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞

同步与异步:描述的是用户线程与内核的交互方式,同步指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍然继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞与非阻塞:描述是用户线程调用内核IO操作的方式,阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

IO模型一共有5类:

  • blocking-IO BIO(阻塞IO)

  • non-blocking IO NIO(非阻塞IO)

  • IO multiplexing IO多路复用

  • signal driven IO 信号驱动IO

  • asynchronous IO AIO(异步IO)

    由于signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

BIO(blocking io)

先来看看读操作流程

1535200521414

从图中可以看出,用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。

对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

也就是说,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

JAVA 阻塞 demo

下面的例子主要使用Socket通道进行编程。服务端如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @program: socketTest
* @description: one thread demo for bio version
* @author: tEngSHe789
* @create: 2018-08-26 21:17
**/
public class Server {
public static void main(String[] args) {
try {
ServerSocket serverSocket=new ServerSocket(8888);
System.out.println("服务端Start....");
//等待客户端就绪 -> 堵塞
while (true){
Socket socket = serverSocket.accept();
System.out.println("发现客户端连接");
InputStream is=socket.getInputStream();
byte[] b =new byte[1024];
//等待客户端发送请求 -> 堵塞
while (true) {
int data = is.read(b);
String info=null;
if (data!=-1){
info=new String(b,0,data,"GBK");
}
System.out.println(info);
}

}
} catch (IOException e) {
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @program: socketTest
* @description: one thread demo for bio version
* @author: tEngSHe789
**/
public class Client {
public static void main(String[] args) {
try {
Socket socket=new Socket("127.0.0.1",8888);
OutputStream os = socket.getOutputStream();
System.out.println("正在发送数据");
os.write("这是来自客户端的信息".getBytes());
os.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}

PY 阻塞 demo

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

s = socket.socket()
s.bind(('127.0.0.1',8888))
print('服务端启动....')
# 等待客户端就绪 -> 堵塞
s.listen()
# 等待客户端发送请求 -> 堵塞
conn,addr = s.accept()
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()
s.close()

客户端

1
2
3
4
5
6
7
import socket

s = socket.socket()
s.connect(('127.0.0.1',8888))
print('客户端已启动....')
s.send('正在发送数据'.encode('utf-8'))
s.close()

NIO(non blocking io)

NIO就不一样了,recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。

轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

1535201598609

JAVA 与NIO

Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。

在java中,标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

我们先看看Buffer类

Buffer类

Java NIO中的Buffer主要用于与NIO通道进行交互,数据是从通道读入到缓冲区,从缓冲区写入通道中的。概念上,缓冲区可以看成包在一个对象内的数组,下面看一个图

新创建的ByteBuffer

这是一个新创建的容量为10的ByteBuffer逻辑图,他有四个属性来提供关于其包含的数据元素信息,分别是:

1)容量(capacity):表示Buffer最大数据容量,缓冲区容量不能为负,并且建立后不能修改。

2)限制(limit):也叫上界。第一个不应该读取或者写入的数据的索引,即位于limit后的数据不可以读写。缓冲区的限制不能为负,并且不能大于其容量(capacity)。

3)位置(position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制(limit)。

4)标记(mark)与重置(reset):标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的position,之后可以通过调用reset()方法恢复到这个position。

从这幅图可以看到,他的容量(capacity)和限制(limit)设置为10,位置设置为0,每个缓冲区容量是固定的,标记是未定义的,其他三个属性可以通过使用缓冲区解决。

缓冲区存储数据支持的数据类型

支持七种数据类型,他们是:
1.byteBuffer
2.charBuffer
3.shortBuffer
4.IntBuffer
5.LongBuffer
6.FloatBuffer
7.DubooBuffer

基本用法

使用Buffer读写数据一般遵循以下四个步骤:

(1) 写入数据到Buffer,一般有可以从Channel读取到缓冲区中,也可以调用put方法写入。

(2) 调用flip()方法,切换数据模式。

(3) 从Buffer中读取数据,一般从缓冲区读取数据写入到通道中,也可以调用get方法读取。

(4) 调用clear()方法或者compact()方法。

缓冲区API

首先,用allocate 指定缓冲区大小1024

1
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
存储或填充

我们可以用put 存入数据到缓冲区

1
byteBuffer.put("tengshe789".getBytes());

当调用put时,会指出下一个元素应当被插入的位置,位置(position)指向的是下一个元素。如果指向的位置超过限制(limit),则抛出BufferOverFlowException异常。

翻转

Flip将一个能够继续添加数据元素的填充状态的缓冲区翻转成一个准备读出元素的释放状态

1
byteBuffer.flip();

具体有什么用呢?

对于已经写满了缓冲区,如果将缓冲区内容传递给一个通道,以使内容能被全部写出。

但如果通道现在在缓冲区上执行get,那么它将从我们刚刚插入的有用数据之外取出未定义数据。通过翻转将位置值重新设为 0,通道就会从正确位置开始获取。

例如我们定义了一个容量是10的buffer,并填入hello,如下图所示

1535636182144

翻转后如下图所示

1535636206735

重读

Rewind与 flip相似,但不影响上界属性。它只是将位置值设回 0。可以使用 rewind()后退,重读已经被翻转的缓冲区中的数据。

1
byteBuffer.rewind();
获取

翻转完了,就可以用get获取缓冲区数据了

1
2
byte[] b= new byte[byteBuffer.limit()];
byteBuffer.get(b);

当调用get时,会指出下一个元素应当被索引的位置,位置(position)返回时会+1s。如果指向的位置超过限制(limit),则抛出BufferUnderFlowException异常。如果提供的索引超过范围,也会抛出IndexOutOfBoundsException异常

释放

remaining可以告诉你从当前位置(position)到限制(limit)还剩的元素数目

1
int count = byteBuffer.remaining();

clear将缓冲区重置为空状态

1
byteBuffer.clear();
压缩

如果我们只想从缓冲区中释放一部分数据,而不是全部,然后重新填充。为了实现这一点,未读的数据元素需要下移以使第一个元素索引为 0。尽管重复这样做会效率低下,但这有时非常必要,而 API 对此为您提供了一个 compact()函数。

1
byteBuffer.compact();
标记与重置

标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的position,之后可以通过调用reset()方法恢复到这个position。要知道缓冲区的标记在mark()函数被调用前时未定义的,如果标记未定义,调用reset()会导致InvalidMarkException异常

1
byteBuffer.position(2).mark().position(4).reset();

要注意,java.nio中的类特意被设计为支持级联调用,优雅的使用级联调用,可以产生优美易读的代码。

直接缓冲区与非直接缓冲区

非直接缓冲区

上面我们说了ByteBuffer,也就是缓冲区的用法,譬如用allocate() 方法指定缓冲区大小,然后进行填充或翻转操作等等等。我们所创建的缓冲区,都属于直接缓冲区。他们都是在JVM内存中创建,在每次调用基础操作系统的一个本机IO之前或者之后,虚拟机都会将缓冲区的内容复制到中间缓冲区(或者从中间缓冲区复制内容),缓冲区的内容驻留在JVM内,因此销毁容易,但是占用JVM内存开销,处理过程中有复制操作。

非直接缓冲区写入步骤:

1.创建一个临时的直接ByteBuffer对象。
2.将非直接缓冲区的内容复制到临时缓冲中。
3.使用临时缓冲区执行低层次I/O操作。
4.临时缓冲区对象离开作用域,并最终成为被回收的无用数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @program: UndirectBuffer
* @description: 利用通道完成文件的复制(非直接缓冲区)
* @author: tEngSHe789
**/
public class UndirectBuffer {
public static void main(String[] args) throws IOException {
// 创建流
FileInputStream fis = new FileInputStream("d://blog.md");
FileOutputStream fos = new FileOutputStream("d://blog.md");
//获取管道
FileChannel in = fis.getChannel();
FileChannel out = fos.getChannel();
// 分配指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (in.read(buffer) !=-1){
buffer.flip();// 准备读数据了
out.write(buffer);
buffer.clear();
}
out.close();
in.close();
fis.close();
fos.close();
}
}
直接缓冲区

直接缓冲区,是通过 allocateDirect() 方法在JVM内存开辟内存,在每次调用基础操作系统的一个本机IO之前或者之后,虚拟机都会避免将缓冲区的内容复制到中间缓冲区(或者从中间缓冲区复制内容),缓冲区的内容驻留在物理内存内,会少一次复制过程,如果需要循环使用缓冲区,用直接缓冲区可以很大地提高性能。

虽然直接缓冲区使JVM可以进行高效的I/O操作,但它使用的内存是操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓冲区要更大的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @program: DirectBuffer
* @description: 使用直接缓冲区完成文件的复制(内存映射文件)
* @author: tEngSHe789
**/
public class DirectBuffer {
public static void main(String[] args) throws IOException {
//创建管道
FileChannel in=FileChannel.open(Paths.get("d://blog.md"),StandardOpenOption.READ);
FileChannel out=FileChannel.open(Paths.get("d://blog.md"),StandardOpenOption.WRITE
,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 拿到将管道内容映射到内存的直接缓冲区映射文件(一个位置在硬盘的基于内存的缓冲区)
MappedByteBuffer inMappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, 0, in.size());
MappedByteBuffer outMappedByteBuffer = out.map(FileChannel.MapMode.READ_WRITE, 0, in.size());
// 对直接缓冲区进行数据读写操作
byte[] bytes=new byte[inMappedByteBuffer.limit()];
inMappedByteBuffer.get(bytes);
outMappedByteBuffer.put(bytes);
in.close();
out.close();
}
}
直接缓冲区与非直接缓冲区的区别
  1. 字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
  2. 直接字节缓冲区可以通过调用此类的 allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
  3. 直接字节缓冲区还可以通过 FileChannelmap() 方法 将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer 。 Java 平台的实现有助于通过 JNI 从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。
  4. 字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理。

Channel

通道是java.nio的第二个创新,表示提供 IO 设备(例如:文件、套接字)的直接连接。

若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。这其中,Channel负责传输, Buffer 负责存储。

通道是由java.nio.channels 包定义的,Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的“流”。只不过 Channel本身不能直接访问数据, Channel 只能与Buffer 进行交互

接口

java.nio.channels.Channel 接口:

  • FileChannel
  • SocketChannel
  • ServerSocketChannel
  • DatagramChannel

与缓冲区不同,通道API主要由接口指定,不同操作系统上通道的实现会不一样

实现

直接缓冲区与非直接缓冲区的栗子

分散读取与聚集写入

通道可以有选择地实现两个新的接口: ScatteringByteChannelGatheringByteChannel

1535700194601

ScatteringByteChannel 有2个read方法,我们都叫她分散读取(scattering Reads),分散读取中,通道依次填充每个缓冲区。填满一个缓冲区后,它就开始填充下一个。在某种意义上,缓冲区数组就像一个大缓冲区。

1535700216940

GatheringByteChannel中有2个wirte方法,我们都叫她聚集写入(gathering Writes),他可以将多个缓冲区的数据聚集到通道中

分散读取与聚集写入的应用

分散读取/聚集写入对于将数据划分为几个部分很有用。例如,您可能在编写一个使用消息对象的网络应用程序,每一个消息被划分为固定长度的头部和固定长度的正文。您可以创建一个刚好可以容纳头部的缓冲区和另一个刚好可以容难正文的缓冲区。当您将它们放入一个数组中并使用分散读取来向它们读入消息时,头部和正文将整齐地划分到这两个缓冲区中。

我们从缓冲区所得到的方便性对于缓冲区数组同样有效。因为每一个缓冲区都跟踪自己还可以接受多少数据,所以分散读取会自动找到有空间接受数据的第一个缓冲区。在这个缓冲区填满后,它就会移动到下一个缓冲区。

Python与NIO

服务端(具体见注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from socket import *
import time
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8888))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
conn_l=[] # 存储和server的连接 的 连接
del_l=[] # 存储和和server的断开 的 连接
while True:
try:
# 这个过程是不阻塞的
conn,addr=s.accept() # 当没人连接的时候会报错,走exception(<- py中是except)
conn_l.append(conn)
except BlockingIOError:
print(conn_l)
for conn in conn_l:
try:
data=conn.recv(1024)
if not data:
del_l.append(conn)
# 这个过程是不阻塞的
data=conn.recv(1024) # 不阻塞
if not data: # 如果拿不到data
del_l.append(conn) # 在废弃列表中添加conn
continue
conn.send(data.upper())
except BlockingIOError:
pass
except ConnectionResetError:
del_l.append(conn)

for conn in del_l:
conn_l.remove(conn)
conn.close()
del_l=[]

客户端

1
2
3
4
5
6
7
8
9
10
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8888))

while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))

IO复用(IO multiplexing)

I/O多路复用实际上就是用select, poll, epoll监听多个io对象,当io对象有变化(有数据)的时候就通知用户进程。有些地方也称这种IO方式为事件驱动IO(event driven IO)。与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。当然具体的可以看看这篇博客,现在先来看下I/O多路复用的流程:

1535201615305

(1)当用户进程调用了select,那么整个进程会被block;

(2)而同时,kernel会“监视”所有select负责的socket;

(3)当任何一个socket中的数据准备好了,select就会返回;

(4)这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

这个图和BIO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而BIO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection

JAVA实现IO复用

这里我们使用的是java.nio下模块来完成I/O多路复用的例子。我用到的Selector(选择器),是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

Selector的使用

Selector的创建

1
Selector selector = Selector.open();

向Selector注册通道

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);

register()方法的第二个参数是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:Connect、Accept、Read、Write

通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

这四种事件用SelectionKey的四个常量来表示:

  1. SelectionKey.OP_CONNECT可连接
  2. SelectionKey.OP_ACCEPT可接受连接
  3. SelectionKey.OP_READ可读
  4. SelectionKey.OP_WRITE可写

SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。它包含了:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象(可选)
interest集合

interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,像这样:

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

可以看到,用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。

ready集合

ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。Selection将在下一小节进行解释。可以这样访问ready集合:

1
int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

从SelectionKey访问Channel和Selector

1
2
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* @program: NIOServer
* @description: 服务端
* @author: tEngSHe789
**/
public class NIOServer {
public static void main(String[] args) throws IOException {
System.out.println("服务端Start....");
// 创建通道
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
// 设置非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定连接
serverSocketChannel.bind(new InetSocketAddress(8888));
// 获取选择器
Selector selector=Selector.open();
// 将通道注册到选择器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮调式获取选择“已经准备就绪”的事件
while (selector.select() > 0){
// 获取当前选择器的左右已经准备就绪的监听事件(选择key)
Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
while (iterator.hasNext()){
// 获取准备就绪事件
SelectionKey selectionKey=iterator.next();
// 判断具体是什么事件
if (selectionKey.isAcceptable()){//如果是“接受就绪”
SocketChannel socketChannel=serverSocketChannel.accept();// 获取连接
socketChannel.configureBlocking(false); // 设置非阻塞
//将该通道注册到服务器上
socketChannel.register(selector, SelectionKey.OP_READ);
}else if (selectionKey.isReadable()){//如是“已经就绪”
SocketChannel socketChannel= (SocketChannel) selectionKey.channel();//获取连接
//读数据
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len = 0;
//分散读取
len=socketChannel.read(buffer);
while (len > 0){
buffer.flip();
System.out.println(new String(buffer.array(),0,len));
buffer.clear();
}
}
iterator.remove();
}
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @program: NIOClient
* @description: 客户端
* @author: tEngSHe789
**/
public class NIOClient {
public static void main(String[] args) throws IOException {
System.out.println("客户端Start....");
// 创建通道
SocketChannel socketChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",8888));
// 设置SocketChannel接口为非阻塞
socketChannel.configureBlocking(false);
//指定缓冲区大小
ByteBuffer buffer=ByteBuffer.allocate(1024);
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.next();
// 存储
buffer.put((new Date().toString()+"\n"+msg).getBytes());
// 翻转
buffer.flip();
// 聚集写入
socketChannel.write(buffer);
// 释放
buffer.clear();
}
socketChannel.close();
}
}

python实现IO复用

对比java用的是Selector,可以帮我们在默认操作系统下选择最合适的select, poll, epoll这三种多路复合模型,python是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from socket import *
import select

s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8888))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
read_l=[s,] # 数据可读通道的列表
while True:
# 监听的read_l中的socket对象内部如果有变化,那么这个对象就会在r_l
# 第二个参数里有什么对象,w_l中就有什么对象
# 第三个参数 如果这里的对象内部出错,那会把这些对象加到x_l中
# 1 是超时时间
r_l,w_l,x_l=select.select(read_l,[],[],1)
print(r_l)
for ready_obj in r_l:
if ready_obj == s:
conn,addr=ready_obj.accept() #此时的ready_obj等于s
read_l.append(conn)
else:
try:
data=ready_obj.recv(1024) #此时的ready_obj等于conn
if not data:
ready_obj.close()
read_l.remove(ready_obj)
raise Exception('连接断开')
ready_obj.send(data.upper())
except ConnectionResetError:
ready_obj.close()
read_l.remove(ready_obj)

客户端

1
2
3
4
5
6
7
8
9
10
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8888))

while True:
msg=input('>>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))

AIO(asynchronous io)

真正的异步I/O很牛逼,流程大概如下:

1535201637209

(1)用户进程发起read操作之后,立刻就可以开始去做其它的事。

(2)而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。

(3)然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

Java

Java中使用AIO需要用到java.nio.channels.AsynchronousChannelGroup和java.nio.channels.AsynchronousServerSocketChannel的包,由于实际项目鲜有人用,就不演示了

总结

回顾一下各个IO Model的比较,如图所示:

1535764374640

  • blocking io :阻塞型io,再熟悉不过,处理accept、read、write都会阻塞用户进程
  • non blocking io:当通过系统调用的时候,如果没有连接或者数据到达就直接返回一个错误,用户进程不阻塞但是不断的轮询。注意这个不是java nio框架中对应的网络模型
  • io multiplexing:io多路复用才是nio对应的网络io模型。该模型对于用户进程也是阻塞的,优点是可以同时支持多个connetciotn。前三种都属于同步模式,既然都是同步的,如果要做到看似非阻塞,那么就需要轮询机制。相对于上一种模型,这种只是将轮询从用户进程转移到了操作系统内核,通过调用select函数,不断轮询多个connection是否ready,如果有一种ready好的,就通过事件通知用户进程,用户进程再通过事件来处理。所以在java的nio中会看到一大堆事件处理。这种模型的阻塞不是在socket层面的阻塞,而是在调动select函数的阻塞。而且相对于blocking io,还多了一次select的系统调用,其实性能会更低,所以在低吞吐量下,这种io不见得比bio+线程池的模型优越。
  • sign driven:极少使用,不知道
  • async io :java7时候开始升级,也成为nio2。实现了异步的io。前三种都是通过用户进程在主动获取(bio的阻塞,nbio的轮询和iomult的按事件获取),而aio交互很简单,用户进程调用后立即返回,用户进程不阻塞,内核当完成网络io和数据复制后,主动通知用户进程。前面说到的系统内核做的操作,除了等待网络io就绪数据到达内核,还有从系统内核复制用户空间去的过程,异步io这两者对于用户进程而言都是非阻塞的,而前三种,在数据从内核复制到用户空间这个过程,都是阻塞的。

参考资料

前言说的那本书

Ron Hitchens于2002年 著的《java nio》

findumars

冬瓜蔡

彼岸船夫

NIO的/分散读取和聚集写入

并发编程网

感谢

续1s时间

全片结束,觉得我写的不错?想要了解更多精彩新姿势?赶快打开我的👉个人博客 👈吧!

本文地址https://blog.tengshe789.tech/2018/08/25/IO%E6%A8%A1%E5%9E%8B/#more,部分觉得比较用心的会同步到掘金,简书,谢谢你那么可爱,还一直关注着我~❤😝

-------------本稿が終わる感谢您的阅读-------------