1.缓冲区
1.1 Buffer类介绍
java在BIO中通常使用字节数组byte[]和字符数组char[]实现数据传输,在NIO中,引入了缓冲区Buffer进行数据传输,相对数组提供了更多的属性和API.Buffer在java.nio包中引入,Buffer对于常见的类型有对应的子接口:ByteBuffer,CharBuffer, FloatBuffer,DoubleBuffer, ShortBuffer,IntBuffer,LongBuffer. 基于使用频率,以下对ByteBuffer进行介绍。
构造方式:
ByteBuffer和Buffer是接口,不能直接通过new方式进行对象实例化;Buffer中提供了allocate和wrapper等静态方法用于构造缓冲区对象。如下所示:
// 方式1:
ByteBuffer buffer = ByteBuffer.allocate(1024);
// // 方式2: 对数组进行包装
byte[] byteArr = new byte[1024];
ByteBuffer buffer2 = ByteBuffer.wrap(byteArr);
属性:
Buffer定义了mark,position,limit,capacity四个属性,为读写提供支持。大小关系为:
0 <= mark <= position <= limit <= capacity
position: 当前位置,可进行读或写;
mark: 标记点,用于记录position位置-以切回原位置, 值为-1表示未设置标记;
limit: 边界,读或者写操作需要在limit范围内;
capacity:容量, 缓冲区的最大存储量,不可改变;
上述4个属性中capacity在缓冲区构造时确定(不可改变),其他三个可以在读写过程中发送变化;对应方法如下:
(1) postionposition()
: 获取当前的位置信息; position(int)
: 设置位置;
public static void test() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(10);
System.out.println("position is: " + buffer.position());
}
得到结果如下:
position is: 10
注意:设置position时需要注意不能超过limit, 否则会报错。
(2) mark:mark()
记录当前的position位置;reset()
将position切回到mark标记点;
public static void test() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(10);
System.out.println("position is: " + buffer.position());
buffer.mark();
buffer.position(100);
System.out.println("position is: " + buffer.position());
buffer.reset();
System.out.println("position is: " + buffer.position());
}
得到结果如下:
position is: 10
position is: 100
position is: 10
注意:reset
方法调用时position被切回到标记位置时,mark标记会被设置为-1;
未标记-调用reset
方法会抛出InvalidMarkException异常;
当position被设置为小于mark值时,mark自动失效.
(3) limitlimit():
获取位置信息; limit(int)
: 设置边界;
public static void test() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(100);
buffer.limit(100);
System.out.println("position is: " + buffer.position() + "; " + "limit is: " + buffer.limit());
buffer.limit(10);
System.out.println("position is: " + buffer.position() + "; " + "limit is: " + buffer.limit());
}
得到结果如下:
position is: 100; limit is: 100
position is: 10; limit is: 10
注意:当limit被设置小于position时,position被压缩为limit值.
(4) capacitycapacity()
: 获取容量;
public static void test() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("capacity is: " + buffer.capacity());
}
得到结果如下:
capacity is: 1024
1.2 clear-flip-rewind方法
(1)clear
方法:
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
将缓冲区重置为初始分配状态,此时limit等于capacity, position指向第一个位置, 清除标记.
(2)flip
方法:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
将缓冲区进行反转,此时limit设置为position,position设置为0,清除标记:
一般用于读写切换,如下所示:
public static void test() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 写入数据到buffer
buffer.put((byte)1);
buffer.put(new byte[] {2, 3});
// flip方法,准备从buffer读取数据
buffer.flip();
// 读取数据
while (buffer.hasRemaining()) {
byte b = buffer.get();
System.out.println(b);
}
}
(3)rewind
方法:
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
此时position设置为0,清除标记;一般用于重写或者重写。
1.3 常用API
(1) 剩余元素
根据limit和position的定义,position到limit之前为可用空间,因此有以下两个API:
// 是否还有可用空间,即是否还有数据
public final boolean hasRemaining() {
return position < limit;
}
// 元素个数
public final int remaining() {
return limit - position;
}
(2) 存取值
可以通过put(byte),
put(byte[])
方法向缓冲区添加数据,也可以通过putShort/putInt/putLong/putFloat/putDouble
分别添加对应类型的数据;取值时使用getXXX进行。
其中,short占据2个字节,int和float占据4个字节,long和double在缓存区中占8个字节。如下所示:
public static void test() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put((byte)1);
System.out.println("[1]position is " + buffer.position());
buffer.putChar('a');
System.out.println("[2]position is " + buffer.position());
buffer.putShort((short)1);
System.out.println("[3]position is " + buffer.position());
buffer.putInt(1);
System.out.println("[4]position is " + buffer.position());
buffer.putFloat(3.14f);
System.out.println("[5]position is " + buffer.position());
buffer.putLong(100L);
System.out.println("[6]position is " + buffer.position());
// IEEE 754
buffer.putDouble(3.14);
System.out.println("[7]position is " + buffer.position());
buffer.flip();
System.out.println(buffer.get());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());
System.out.println(buffer.getInt());
System.out.println(buffer.getFloat());
System.out.println(buffer.getLong());
System.out.println(buffer.getDouble());
}
运行结果如下:
[1]position is 1
[2]position is 3
[3]position is 5
[4]position is 9
[5]position is 13
[6]position is 21
[7]position is 29
1
a
1
1
3.14
100
3.14
(3)只读缓存区:asReadOnlyBuffer()
方法返回只读缓存区, 写操作将抛出ReadOnlyBufferException异常
如下所示:
public static void test() {
ByteBuffer buffer = ByteBuffer.wrap(new byte[] {1}).asReadOnlyBuffer();
System.out.println("Buffer is readOnly: " + buffer.isReadOnly());
buffer.put((byte)1);
}
测试用例运行结果如下:
Exception in thread "main" java.nio.ReadOnlyBufferException
at java.nio.HeapByteBufferR.put(HeapByteBufferR.java:172)
at com.seong.IoApplication.test(IoApplication.java:20)
at com.seong.IoApplication.main(IoApplication.java:14)
Buffer is readOnly: true
(4)直接内存:
通过ByteBuffer.wrap
和ByteBuffer.allocate
方法构造的缓冲区都是基于数组实现的,在堆内存实现;
也可通过ByteBuffer.allocateDirect
在直接内存分配缓冲区:
public static void test() {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
System.out.println("Buffer is direct: " + buffer.isDirect());
}
直接缓冲区底层是通过Unsafe类对直接物理内存进行操作。
2.通道
通道封装了数据传输过程,提供了基于缓冲区的读写能力。
2.1 channel接口
java.nio包中定义了Channel通道接口:
public interface Channel extends Closeable {
// 通道是否打开
public boolean isOpen();
// 关闭通道
public void close() throws IOException;
}
Note:
Channel因为继承了Closeable接口,间接继承了AutoCloseable接口。使用try-with-source时,会自动调用AutoCloseable接口的close()
方法,而Closeable接口具有幂等性,反复调用close()
方法不会有副作用。
类继承图如下所示:
Channel接口的继承体系可以分为种类型:
[1] 选择通道
SelectableChannel类型的通道,可以被注册到选择器中,实现多路复用。
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
protected SelectableChannel() { }
public abstract SelectorProvider provider();
public abstract int validOps();
// 根据Selector获取注册到该Selector对象的SelectionKey
public abstract SelectionKey keyFor(Selector sel);
// 提供通道的注册能力
public abstract boolean isRegistered();
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {return register(sel, ops, null);}
// 设置和获取通道的阻塞性
public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
public abstract boolean isBlocking();
public abstract Object blockingLock();
上述代码的核心逻辑是register
方法和configureBlocking
方法,前者提供了想选择器注册的能力,后者对通道的阻塞性进行配置。
[2] 网络通道
NetworkChannel用于配制本地地址和socket参数:
public interface NetworkChannel extends Channel {
// 地址相关
NetworkChannel bind(SocketAddress local) throws IOException;
SocketAddress getLocalAddress() throws IOException;
// socket参数信息
<T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;
<T> T getOption(SocketOption<T> name) throws IOException;
Set<SocketOption<?>> supportedOptions();
}
需要通信的通道,必然要实现NetworkChannel接口。
[3] 读写通道
读写通道用于提供读写能力。
ReadableByteChannel提供了读ByteBuffer的能力,ScatteringByteChannel提供了读ByteBuffer数组的能力;
GatheringByteChannel提供了写ByteBuffer的能力,WritableByteChannel提供了写ByteBuffer数组的能力;
ByteChannel将ReadableByteChannel和WritableByteChannel做了整合,提供了读写ByteBuffer的能力。
因此,需要进行读写的通道,需要实现或者继承上述接口。
2.2 ServerSocketChannel-SocketChannel-DatagramChannel接口
NIO体系中常用的通道包括:ServerSocketChannel, SocketChannel, DatagramChannel; ServerSocketChannel用于服务端处理连接,SocketChannel用于传输TCP,DatagramChannel用于传输UDP消息。
SocketChannel继承了选择通道、网络通道、读写通道:
SocketChannel用于服务端处理连接,因此需要继承选择通道和网络通道,而无需读写能力:
DatagrameChannel用于UDP客户端,继承了选择通道、网络通道、读写通道:
2.3 SelectableChannel
SelectableChannel接口中定义了register方法,拥有向选择器注册的能力,因此需要单独进行说明。后续介绍的ServerSocketChannel和SocketChannel都是SelectableChannel的实现类,因此也可向选择器注册。注册方法如下所示:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
return register(sel, ops, null);
}
sel是选择器对象,表面当前通道向那个选择器注册;ops指定注册感兴趣的事件(可连接的ACCEPT事件、可读的READ事件等);att是attachment附件,用于传参,可从选择器返回的就绪事件或者注册返回的结果对象中获取attachment。
SelectableChannel除了具备注册能力外,还继承了InterruptibleChannel接口,从而具备了中断特性(中断特性逻辑在AbstractInterruptibleChannel抽象类中实现)。当线程阻塞在SelectableChannel通道上,其他线程调用这个线程的intterupt()方法,通道将被关闭,且阻塞的线程会被唤醒并收到ClosedByIntteruptException异常。
另外,由于SelectableChannel还具备closeable和asynchronously特性,当线程阻塞在SelectableChannel通道上,其他线程调用通道的close接口关闭通道时,阻塞线程会被唤醒且收到AsychronousCloseException异常。
2.4 ServerSocketChannel-SocketChannel接口使用
SocketChannel与DatagrameChannel使用方式类似,本文选择通过介绍ServerSocketChannel和SocketChannel类的使用介绍NIO。
2.4.1 构造channel对象
通过静态方法open进行构建,通过InetSocketAddress对象进行Socket地址的绑定:
客户端:
// 客户端
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8000));
服务端:
ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", listenPort));
2.4.2 配置通道非阻塞
因为SocketChannel和ServerSocketChannel继承了SelectableChannel接口的configureBlocking
和regsiter
方法,因此可以设置通道的阻塞性:
socketChannel.configureBlocking(false);
说明:在将通道注册到选择器之前,需要将通道设置为非阻塞,否则会抛出异常。
2.4.3 注册channel
通过register方法将通道注册到选择器上:
// 关注accept事件
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
3.选择器
选择器可以实现一个线程管理多个通道,即IO多路复用。通道提前被注册到选择器,当有已就绪的通道时,进行数据处理。选择器可以使用一个线程管理多个通道,减少了线程的使用数量,且不需要进行线程上下文的切换,有利于提高服务器性能。Linux操作系统中,底层实现依赖于epoll机制。
3.1 选择器事件
通道向选择器注册时,可以指定感兴趣的事件,其宏定义在SelectionKey中:
// 读取操作
public static final int OP_READ = 1 << 0;
// 写入操作
public static final int OP_WRITE = 1 << 2;
// 套接字接收操作
public static final int OP_CONNECT = 1 << 3;
// 套接字连接操作
public static final int OP_ACCEPT = 1 << 4;
3.2 通道注册
将通道注册到选择器的底层逻辑实现由选择器提供,选择器通过JVM屏蔽了底层的差异。选择器中的方法如下:
protected abstract SelectionKey register(AbstractSelectableChannel ch, int ops, Object att);
3.3 SelectionKey
当有IO事件到达时,选择器会被唤醒,并返回一个SelectionKey对象。可从SelectionKey对象中获取以下信息:
[1] 所属的通道和选择器
// 关联的通道
SelectableChannel channel = key.channel();
// 关联的选择器
Selector selector = key.selector();
[2] 已就绪的事件集
// 已就绪的事件集
int readyOps = key.readyOps();
// 是否包含Accept连接事件
key.isAcceptable();
// 是否包含可读事件
key.isReadable();
[3] attachment附件
// 设置attach
key.attach(obj);
// 获取attachment
key.attachment();
3.4 选择器使用
创建选择器:
由于不同操作系统对于IO的底层实现不同,因此JVM需要屏蔽底层差异,提供了SelectorProvider自动根据不同系统创建对应的Selector选择器对象(通道对象的创建过程类似):
Selector selector = Selector.open();
public static Selector open() throws IOException {
// 由SelectorProvider屏蔽底层差异
return SelectorProvider.provider().openSelector();
}
select阻塞:
将通道注册到选择器后,可通过选择器的选择方法陷入阻塞等待感兴趣的事件就绪。选择器为不同场景重载了以下方法:
// 持续阻塞等待就绪事件
public abstract int select() throws IOException;
// 阻塞等待timeout微秒,如果没有就绪事件也返回
public abstract int select(long timeout) throws IOException;
// 非阻塞,查询当前是否已有就绪事件,返回就绪数量
public abstract int selectNow() throws IOException;
处理就绪事件:
通过selectedKeys()方法返回已就绪的事件,以SelectionKey集合的方式返回:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
常规操作如下:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 处理连接事件...
} else if (key.isReadable()) {
// 处理可读事件...
}
// 移除已处理事件,防止死循环
keyIterator.remove();
}
4.案例介绍
以下通过建立一个TCP服务端的案例演示NIO的使用方式,案例包含了缓冲区、通道、选择器的使用方式。
package com.seong;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class Server {
private Selector selector;
public Server() {
try {
this.selector = Selector.open();
} catch (Exception ignore) {
}
}
public void start(int listenPort) throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress("localhost", listenPort));
serverChannel.configureBlocking(false);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
while (true) {
// 调用选择器的select()方法阻塞等待(当有IO事件到达时,从阻塞中唤醒)
this.selector.select();
// 获取所有已就绪事件
Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
// 遍历事件
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 处理连接事件
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(this.selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 处理可读事件
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readBytes = clientChannel.read(buffer);
if (readBytes > 0) {
System.out.println("receive msg: " + new String(buffer.array()));
}
}
// 移除已处理事件,防止死循环
keyIterator.remove();
}
}
}
public static void main(String[] args) throws Exception {
new Server().start(8000);
}
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » IO系列-3 NIO基本概念:Buffer和Channel和Selector
发表评论 取消回复