1.缓冲区

1.1 Buffer类介绍

java在BIO中通常使用字节数组byte[]和字符数组char[]实现数据传输,在NIO中,引入了缓冲区Buffer进行数据传输,相对数组提供了更多的属性和API.Buffer在java.nio包中引入,Buffer对于常见的类型有对应的子接口:ByteBuffer,CharBuffer, FloatBuffer,DoubleBuffer, ShortBuffer,IntBuffer,LongBuffer. 基于使用频率,以下对ByteBuffer进行介绍。

构造方式:
ByteBuffer和Buffer是接口,不能直接通过new方式进行对象实例化;Buffer中提供了allocate和wrapper等静态方法用于构造缓冲区对象。如下所示:

// 方式1:
ByteBuffer buffer = ByteBuffer.allocate(1024);

// // 方式2: 对数组进行包装
byte[] byteArr = new byte[1024];
ByteBuffer buffer2 = ByteBuffer.wrap(byteArr);

属性:

Buffer定义了mark,position,limit,capacity四个属性,为读写提供支持。大小关系为:

0 <= mark <= position <= limit <= capacity

position: 当前位置,可进行读或写;
mark: 标记点,用于记录position位置-以切回原位置, 值为-1表示未设置标记;
limit: 边界,读或者写操作需要在limit范围内;
capacity:容量, 缓冲区的最大存储量,不可改变;
上述4个属性中capacity在缓冲区构造时确定(不可改变),其他三个可以在读写过程中发送变化;对应方法如下:

(1) postion
position(): 获取当前的位置信息; position(int): 设置位置;

public static void test() {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.position(10);
    System.out.println("position is: " + buffer.position());
}

得到结果如下:

position is: 10

注意:设置position时需要注意不能超过limit, 否则会报错。

(2) mark:
mark()记录当前的position位置;reset()将position切回到mark标记点;

public static void test() {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.position(10);
    System.out.println("position is: " + buffer.position());

    buffer.mark();
    buffer.position(100);
    System.out.println("position is: " + buffer.position());
    buffer.reset();
    System.out.println("position is: " + buffer.position());
}

得到结果如下:

position is: 10
position is: 100
position is: 10

注意:reset方法调用时position被切回到标记位置时,mark标记会被设置为-1;
未标记-调用reset方法会抛出InvalidMarkException异常;
当position被设置为小于mark值时,mark自动失效.

(3) limit
limit(): 获取位置信息; limit(int): 设置边界;

public static void test() {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.position(100);
    buffer.limit(100);
    System.out.println("position is: " + buffer.position() + "; " + "limit is: " + buffer.limit());

    buffer.limit(10);
    System.out.println("position is: " + buffer.position() + "; " + "limit is: " + buffer.limit());
}

得到结果如下:

position is: 100; limit is: 100
position is: 10; limit is: 10

注意:当limit被设置小于position时,position被压缩为limit值.

(4) capacity
capacity(): 获取容量;

public static void test() {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    System.out.println("capacity is: " + buffer.capacity());
}

得到结果如下:

capacity is: 1024

1.2 clear-flip-rewind方法

(1)clear方法:

public final Buffer clear() {
	position = 0;
	limit = capacity;
	
	mark = -1;
	return this;
}

将缓冲区重置为初始分配状态,此时limit等于capacity, position指向第一个位置, 清除标记.

(2)flip方法:

public final Buffer flip() {
	limit = position;
	position = 0;
	
	mark = -1;
	return this;
}

将缓冲区进行反转,此时limit设置为position,position设置为0,清除标记:
一般用于读写切换,如下所示:

public static void test() {
	ByteBuffer buffer = ByteBuffer.allocate(1024);
	// 写入数据到buffer
	buffer.put((byte)1);
	buffer.put(new byte[] {2, 3});
	
	// flip方法,准备从buffer读取数据
	buffer.flip();
	
	// 读取数据
	while (buffer.hasRemaining()) {
		byte b = buffer.get();
		System.out.println(b);
	}
}

(3)rewind方法:

public final Buffer rewind() {
	position = 0;
	
	mark = -1;
	return this;
}

此时position设置为0,清除标记;一般用于重写或者重写。

1.3 常用API

(1) 剩余元素
根据limit和position的定义,position到limit之前为可用空间,因此有以下两个API:

// 是否还有可用空间,即是否还有数据
public final boolean hasRemaining() {
	return position < limit;
}

//  元素个数
public final int remaining() {
	return limit - position;
}

(2) 存取值
可以通过put(byte), put(byte[])方法向缓冲区添加数据,也可以通过putShort/putInt/putLong/putFloat/putDouble分别添加对应类型的数据;取值时使用getXXX进行。
其中,short占据2个字节,int和float占据4个字节,long和double在缓存区中占8个字节。如下所示:

public static void test() {
	ByteBuffer buffer = ByteBuffer.allocate(1024);
	buffer.put((byte)1);
	System.out.println("[1]position is " + buffer.position());
	
	buffer.putChar('a');
	System.out.println("[2]position is " + buffer.position());
	
	buffer.putShort((short)1);
	System.out.println("[3]position is " + buffer.position());
	
	buffer.putInt(1);
	System.out.println("[4]position is " + buffer.position());
	
	buffer.putFloat(3.14f);
	System.out.println("[5]position is " + buffer.position());
	
	buffer.putLong(100L);
	System.out.println("[6]position is " + buffer.position());
	
	// IEEE 754
	buffer.putDouble(3.14);
	System.out.println("[7]position is " + buffer.position());
	
	buffer.flip();
	
	System.out.println(buffer.get());
	System.out.println(buffer.getChar());
	System.out.println(buffer.getShort());
	System.out.println(buffer.getInt());
	System.out.println(buffer.getFloat());
	System.out.println(buffer.getLong());
	System.out.println(buffer.getDouble());
}

运行结果如下:

 [1]position is 1
 [2]position is 3
 [3]position is 5
 [4]position is 9
 [5]position is 13
 [6]position is 21
 [7]position is 29
 1
 a
 1
 1
 3.14
 100
 3.14

(3)只读缓存区:
asReadOnlyBuffer()方法返回只读缓存区, 写操作将抛出ReadOnlyBufferException异常
如下所示:

public static void test() {
	ByteBuffer buffer = ByteBuffer.wrap(new byte[] {1}).asReadOnlyBuffer();
	System.out.println("Buffer is readOnly: " + buffer.isReadOnly());
	buffer.put((byte)1);
}

测试用例运行结果如下:

Exception in thread "main" java.nio.ReadOnlyBufferException
at java.nio.HeapByteBufferR.put(HeapByteBufferR.java:172)
at com.seong.IoApplication.test(IoApplication.java:20)
at com.seong.IoApplication.main(IoApplication.java:14)
Buffer is readOnly: true

(4)直接内存:
通过ByteBuffer.wrapByteBuffer.allocate方法构造的缓冲区都是基于数组实现的,在堆内存实现;
也可通过ByteBuffer.allocateDirect在直接内存分配缓冲区:

public static void test() {
	ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
	System.out.println("Buffer is direct: " + buffer.isDirect());
}

直接缓冲区底层是通过Unsafe类对直接物理内存进行操作。

2.通道

通道封装了数据传输过程,提供了基于缓冲区的读写能力。

2.1 channel接口

java.nio包中定义了Channel通道接口:

public interface Channel extends Closeable {
	// 通道是否打开
    public boolean isOpen();
    // 关闭通道
    public void close() throws IOException;
}

Note:
Channel因为继承了Closeable接口,间接继承了AutoCloseable接口。使用try-with-source时,会自动调用AutoCloseable接口的close()方法,而Closeable接口具有幂等性,反复调用close()方法不会有副作用。

类继承图如下所示:
在这里插入图片描述Channel接口的继承体系可以分为种类型:
[1] 选择通道
SelectableChannel类型的通道,可以被注册到选择器中,实现多路复用。

public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
    protected SelectableChannel() { }
    public abstract SelectorProvider provider();

    public abstract int validOps();

	// 	根据Selector获取注册到该Selector对象的SelectionKey
    public abstract SelectionKey keyFor(Selector sel);
        
   	// 提供通道的注册能力
    public abstract boolean isRegistered();
    public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
    public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {return register(sel, ops, null);}

   	// 设置和获取通道的阻塞性
    public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
    public abstract boolean isBlocking();
    public abstract Object blockingLock();

上述代码的核心逻辑是register方法和configureBlocking方法,前者提供了想选择器注册的能力,后者对通道的阻塞性进行配置。

[2] 网络通道
NetworkChannel用于配制本地地址和socket参数:

public interface NetworkChannel extends Channel {
	// 地址相关
    NetworkChannel bind(SocketAddress local) throws IOException;
    SocketAddress getLocalAddress() throws IOException;
    
    // socket参数信息
    <T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;
    <T> T getOption(SocketOption<T> name) throws IOException;
    Set<SocketOption<?>> supportedOptions();
}

需要通信的通道,必然要实现NetworkChannel接口。

[3] 读写通道
读写通道用于提供读写能力。
在这里插入图片描述
ReadableByteChannel提供了读ByteBuffer的能力,ScatteringByteChannel提供了读ByteBuffer数组的能力;
GatheringByteChannel提供了写ByteBuffer的能力,WritableByteChannel提供了写ByteBuffer数组的能力;
ByteChannel将ReadableByteChannel和WritableByteChannel做了整合,提供了读写ByteBuffer的能力。
因此,需要进行读写的通道,需要实现或者继承上述接口。

2.2 ServerSocketChannel-SocketChannel-DatagramChannel接口

NIO体系中常用的通道包括:ServerSocketChannel, SocketChannel, DatagramChannel; ServerSocketChannel用于服务端处理连接,SocketChannel用于传输TCP,DatagramChannel用于传输UDP消息。

SocketChannel继承了选择通道、网络通道、读写通道:
在这里插入图片描述
SocketChannel用于服务端处理连接,因此需要继承选择通道和网络通道,而无需读写能力:
在这里插入图片描述
DatagrameChannel用于UDP客户端,继承了选择通道、网络通道、读写通道:
在这里插入图片描述

2.3 SelectableChannel

SelectableChannel接口中定义了register方法,拥有向选择器注册的能力,因此需要单独进行说明。后续介绍的ServerSocketChannel和SocketChannel都是SelectableChannel的实现类,因此也可向选择器注册。注册方法如下所示:

public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;

public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
    return register(sel, ops, null);
}

sel是选择器对象,表面当前通道向那个选择器注册;ops指定注册感兴趣的事件(可连接的ACCEPT事件、可读的READ事件等);att是attachment附件,用于传参,可从选择器返回的就绪事件或者注册返回的结果对象中获取attachment。
SelectableChannel除了具备注册能力外,还继承了InterruptibleChannel接口,从而具备了中断特性(中断特性逻辑在AbstractInterruptibleChannel抽象类中实现)。当线程阻塞在SelectableChannel通道上,其他线程调用这个线程的intterupt()方法,通道将被关闭,且阻塞的线程会被唤醒并收到ClosedByIntteruptException异常。
另外,由于SelectableChannel还具备closeable和asynchronously特性,当线程阻塞在SelectableChannel通道上,其他线程调用通道的close接口关闭通道时,阻塞线程会被唤醒且收到AsychronousCloseException异常。

2.4 ServerSocketChannel-SocketChannel接口使用

SocketChannel与DatagrameChannel使用方式类似,本文选择通过介绍ServerSocketChannel和SocketChannel类的使用介绍NIO。

2.4.1 构造channel对象

通过静态方法open进行构建,通过InetSocketAddress对象进行Socket地址的绑定:
客户端:

// 客户端
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8000));

服务端:

ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", listenPort));

2.4.2 配置通道非阻塞

因为SocketChannel和ServerSocketChannel继承了SelectableChannel接口的configureBlockingregsiter方法,因此可以设置通道的阻塞性:

socketChannel.configureBlocking(false);

说明:在将通道注册到选择器之前,需要将通道设置为非阻塞,否则会抛出异常。

2.4.3 注册channel

通过register方法将通道注册到选择器上:

// 关注accept事件
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

3.选择器

选择器可以实现一个线程管理多个通道,即IO多路复用。通道提前被注册到选择器,当有已就绪的通道时,进行数据处理。选择器可以使用一个线程管理多个通道,减少了线程的使用数量,且不需要进行线程上下文的切换,有利于提高服务器性能。Linux操作系统中,底层实现依赖于epoll机制

3.1 选择器事件

通道向选择器注册时,可以指定感兴趣的事件,其宏定义在SelectionKey中:

// 读取操作
public static final int OP_READ = 1 << 0;
// 写入操作
public static final int OP_WRITE = 1 << 2;
// 套接字接收操作
public static final int OP_CONNECT = 1 << 3;
// 套接字连接操作
public static final int OP_ACCEPT = 1 << 4;

3.2 通道注册

将通道注册到选择器的底层逻辑实现由选择器提供,选择器通过JVM屏蔽了底层的差异。选择器中的方法如下:

protected abstract SelectionKey register(AbstractSelectableChannel ch, int ops, Object att);

3.3 SelectionKey

当有IO事件到达时,选择器会被唤醒,并返回一个SelectionKey对象。可从SelectionKey对象中获取以下信息:
[1] 所属的通道和选择器

// 关联的通道
SelectableChannel channel = key.channel();

// 关联的选择器
Selector selector = key.selector();

[2] 已就绪的事件集

// 已就绪的事件集
int readyOps = key.readyOps();

// 是否包含Accept连接事件
key.isAcceptable();

// 是否包含可读事件
key.isReadable();

[3] attachment附件

// 设置attach
key.attach(obj);

// 获取attachment
key.attachment();

3.4 选择器使用

创建选择器:
由于不同操作系统对于IO的底层实现不同,因此JVM需要屏蔽底层差异,提供了SelectorProvider自动根据不同系统创建对应的Selector选择器对象(通道对象的创建过程类似):

Selector selector = Selector.open();

public static Selector open() throws IOException {
	// 由SelectorProvider屏蔽底层差异
    return SelectorProvider.provider().openSelector();
}

select阻塞:
将通道注册到选择器后,可通过选择器的选择方法陷入阻塞等待感兴趣的事件就绪。选择器为不同场景重载了以下方法:

// 持续阻塞等待就绪事件
public abstract int select() throws IOException;

// 阻塞等待timeout微秒,如果没有就绪事件也返回
public abstract int select(long timeout) throws IOException;

// 非阻塞,查询当前是否已有就绪事件,返回就绪数量
public abstract int selectNow() throws IOException;

处理就绪事件:
通过selectedKeys()方法返回已就绪的事件,以SelectionKey集合的方式返回:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

常规操作如下:

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if (key.isAcceptable()) {
        // 处理连接事件...
    } else if (key.isReadable()) {
        // 处理可读事件...
    }
    // 移除已处理事件,防止死循环
    keyIterator.remove();
}

4.案例介绍

以下通过建立一个TCP服务端的案例演示NIO的使用方式,案例包含了缓冲区、通道、选择器的使用方式。

package com.seong;

import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Server {
    private Selector selector;

    public Server() {
        try {
            this.selector = Selector.open();
        } catch (Exception ignore) {
        }
    }
    
    public void start(int listenPort) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress("localhost", listenPort));
        serverChannel.configureBlocking(false);

        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        while (true) {
        	// 调用选择器的select()方法阻塞等待(当有IO事件到达时,从阻塞中唤醒)
            this.selector.select();
            // 获取所有已就绪事件
            Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
            // 遍历事件
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if (key.isAcceptable()) {
  		            // 处理连接事件
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(this.selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 处理可读事件
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int readBytes = clientChannel.read(buffer);
                    if (readBytes > 0) {
                        System.out.println("receive msg: " + new String(buffer.array()));
                    }
                }
                // 移除已处理事件,防止死循环
                keyIterator.remove();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        new Server().start(8000);
    }
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部