博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java NIO核心三大组件Channel、Buffer和Selector(二)
阅读量:3929 次
发布时间:2019-05-23

本文共 21192 字,大约阅读时间需要 70 分钟。

上篇:

一、直接缓冲区map创建

内存管理

在深入MappedByteBuffer之前,先看看计算机内存管理的几个术语:

  • MMC:CPU的内存管理单元。
  • 物理内存:即内存条的内存空间。
  • 虚拟内存:计算机系统内存管理的一种技术。它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间),而实际上,它通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。
  • 页面文件:操作系统反映构建并使用虚拟内存的硬盘空间大小而创建的文件,在windows下,即pagefile.sys文件,其存在意味着物理内存被占满后,将暂时不用的数据移动到硬盘上。
  • 缺页中断:当程序试图访问已映射在虚拟地址空间中但未被加载至物理内存的一个分页时,由MMC发出的中断。如果操作系统判断此次访问是有效的,则尝试将相关的页从虚拟内存文件中载入物理内存。

1、为什么会有虚拟内存和物理内存的区别?

如果正在运行的一个进程,它所需的内存是有可能大于内存条容量之和的,如内存条是256M,程序却要创建一个2G的数据区,那么所有数据不可能都加载到内存(物理内存),必然有数据要放到其他介质中(比如硬盘),待进程需要访问那部分数据时,再调度进入物理内存。虚拟内存技术就派上用场了。

2、什么是虚拟内存地址和物理内存地址?

虚拟内存地址:虚拟内存地址范围的大小由CPU的位数决定,假设你的计算机是32位,那么它的地址总线是32位的,它的地址范围是0~0xFFFFFFFF (4G),这个范围就是我们的程序能够产生的地址范围,我们把这个地址范围称为虚拟地址空间,该空间中的某一个地址我们称之为虚拟地址。

物理内存地址:与虚拟地址空间和虚拟地址相对应的则是物理地址空间和物理地址,大多数时候我们的系统所具备的物理地址空间只是虚拟地址空间的一个子集。举个例子,对于一台内存(物理内存)为256M的32bit x86主机来说,它的虚拟地址空间范围是0~0xFFFFFFFF(4G),而物理地址空间范围是0x00000000 ~ 0x0FFFFFFF(256M)。

3、内存分页机制

大多数使用虚拟内存的系统都使用一种称为分页(paging)机制。虚拟地址空间划分成称为页(page)的单位,而相应的物理地址空间也被进行划分,单位是页帧(frame)。页和页帧的大小必须相同。

举个例子,我们有一台可以生成32位地址的机器,它的虚拟地址范围从0~0xFFFFFFFF(4G),而这台机器只有256M的物理地址,因此他可以运行4G的程序,但该程序不能一次性调入内存运行。这台机器必须有一个达到可以存放4G程序的外部存储器(例如磁盘),以保证程序片段在需要时可以被调用。在这个例子中,页的大小为4K,页帧大小与页相同——这点是必须保证的,因为内存和外围存储器之间的传输总是以页为单位的。对应4G的虚拟地址和256M的物理存储器,他们分别包含了1M个页和64K个页帧。

4、虚拟内存与物理内存之间的联系是通过分页表来建立

页表是一种特殊的数据结构,放在系统空间的页表区,存放逻辑页与物理页帧的对应关系。 每一个进程都拥有一个自己的页表,PCB(进程管理块)表中有指针指向页表。

虚拟内存页的个数 > 物理内存页帧的个数,岂不是有些虚拟内存页的地址永远没有对应的物理内存地址空间?不是的,操作系统是这样处理的。操作系统有个页面失效(page fault)功能。操作系统找到一个使用的最少的页帧,使之失效,并把它写入磁盘,随后把需要访问的页放到这个页帧中来,并修改页表中的映射,保证了所有的页都会有对应的物流内存地址空间。

虚拟内存地址组成:由页号(与页表中的页号关联)和偏移量(页的小大,即这个页能存了多少数据)组成。

举个例子,有一个虚拟地址它的页号是4,偏移量是20,那么他的寻址过程是这样的:首先到页表中找到页号4对应的页帧号(比如为8),如果这个页号没有与之对应的页帧号,则用失效机制调入页,接着把页帧号和偏移量传给MMU组成一个物理上真正存在的地址,最后就是访问物理内存的数据了。

 

直接缓冲区map创建

虚拟内存技术可以让硬盘上文件的位置(物理地址)与进程逻辑地址空间(虚拟内存地址空间)中一块大小相同的区域之间的一一对应,这种对应关系纯属是逻辑上的概念,物理上是不存在的,原因是进程的逻辑地址空间本身就是不存在的。

在内存映射的过程中,并没有实际的数据拷贝,文件没有被载入内存,只是逻辑上被放入了内存。

在JVM内存外开辟内存,在每次调用操作系统的一个本机I/O之前或者之后,数据传递会少一次复制过程。如果需要循环使用缓冲区,用直接缓冲区可以很大地提高性能。虽然直接缓冲区使JVM可以进行高效的I/O操作,但它使用的内存是操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓冲区要更大的开销。所以,针对大数据处理时,直接缓冲区可以提升效率。

1、直接缓冲区的创建

1)ByteBuffer.allocateDirect()方法直接内存DirectMemory的大小,默认为 -Xmx 的JVM堆的最大值,但是并不受其限制,而是由JVM参数 MaxDirectMemorySize单独控制。

2)通道的map方法创建MappedByteBuffer,MappedByteBuffer处理大文件,一次最多只能读2G内容,推荐使用

abstract   ( mode, long position, long size)
          将此通道的文件区域直接映射到内存中。

FileChannel中的几个变量

  • MapMode mode:内存映像文件访问的方式,共三种:
    1. MapMode.READ_ONLY:只读,试图修改得到的缓冲区将导致抛出异常。
    2. MapMode.READ_WRITE:读/写,对得到的缓冲区的更改最终将写入文件;但该更改对映射到同一文件的其他程序不一定是可见的。
    3. MapMode.PRIVATE:私用,可读可写,但是修改的内容不会写入文件,只是buffer自身的改变,这种能力称之为”copy on write”。
  • position:文件映射时的起始位置。映射区域从此位置开始;必须为非负数。
  • size:要映射的区域大小;必须为非负数且不大于Integer.MAX_VALUE(2G)

2、MappedByteBuffer的使用:它不需要翻转

简单的文件复制,注意:这里没有正确的释放MappedByteBuffer,项目中记得正确释放。

public static void main(String[] args) throws Exception {        long start = System.currentTimeMillis();        // 1.创建FileChannel        FileChannel readChannel = new RandomAccessFile("D:/E/NIO/text.txt", "r").getChannel();        FileChannel writeChannel = new RandomAccessFile("D:/E/NIO/text_copy.txt", "rw").getChannel();//        FileChannel readChannel = new RandomAccessFile("D:/E/NIO/textBig.txt", "r").getChannel();//        FileChannel writeChannel = new RandomAccessFile("D:/E/NIO/textBig_copy.txt", "rw").getChannel();        // 2.缓冲区        MappedByteBuffer mappedByteBuffer = readChannel.map(FileChannel.MapMode.READ_ONLY, 0, readChannel.size());        while (mappedByteBuffer.remaining() > 0) {            writeChannel.write(mappedByteBuffer);        }        readChannel.close();        writeChannel.close();        long end = System.currentTimeMillis();        System.out.println("所需时间为:" + (end - start));    }

    直接缓冲区操作时间:35M--834ms,  1.56G--11983ms 

3、MappedByteBuffer释放

1)文件小于2G时

MappedByteBuffer没有unmap方法,使得这个文件会一直被程序的资源句柄占用着无法释放。测试如下

public static void main(String[] args) throws Exception {        File srcfile = new File("D:/E/NIO/text.txt");        FileChannel readChannel = new RandomAccessFile(srcfile, "r").getChannel();        FileChannel writeChannel = new RandomAccessFile("D:/E/NIO/text_copy.txt", "rw").getChannel();        MappedByteBuffer mappedByteBuffer = readChannel.map(FileChannel.MapMode.READ_ONLY, 0, readChannel.size());        while (mappedByteBuffer.remaining() > 0) {            writeChannel.write(mappedByteBuffer);        }        readChannel.close();        writeChannel.close();        // 文件的资源句柄有没有被释放,可以通过修改文件名来测试        // 名字被改了,证明资源句柄释放了,没改表示没有被释放        srcfile.renameTo(new File("D:/E/NIO/text_2.txt"));    }// 程序正常结束,但是文件没有改名

解决方案:使用 Cleaner类

// 断开连接        Cleaner cleaner = ((DirectBuffer)mappedByteBuffer).cleaner();        if (cleaner != null) {            cleaner.clean();        }        // 文件的资源句柄被释放,修改成功        srcfile.renameTo(new File("D:/E/NIO/text_2.txt"));

2)文件大于2G时:分块循环读取 

MappedByteBuffer处理大文件,一次最多只能读2G内容到内存中,为了读取大文件,需要分块循环读取处理:

public static void main(String[] args) throws Exception {        long start = System.currentTimeMillis();        File srcfile = new File("D:/E/NIO/textBig2.txt");        // 1.创建FileChannel        FileChannel readChannel = new RandomAccessFile(srcfile, "r").getChannel();        FileChannel writeChannel = new RandomAccessFile("D:/E/NIO/textBig2_copy.txt", "rw").getChannel();        long fileLength = readChannel.size();        long current = 0;        long mapSize = 1L << 30;  // 每次映射的区域大小设置1G=1024*1024*1024        MappedByteBuffer mappedByteBuffer = null;        for (; current < fileLength; current += mapSize) {            if (fileLength - current < mapSize) {                mappedByteBuffer = readChannel.map(FileChannel.MapMode.READ_ONLY, current, fileLength - current);            } else {                mappedByteBuffer = readChannel.map(FileChannel.MapMode.READ_ONLY, current, mapSize);            }            while (mappedByteBuffer.hasRemaining()) {                writeChannel.write(mappedByteBuffer);            }        }        readChannel.close();        writeChannel.close();        long end = System.currentTimeMillis();        System.out.println("所需时间为:" + (end - start));        // 断开连接        Cleaner cleaner = ((DirectBuffer)mappedByteBuffer).cleaner();        if (cleaner != null) {            cleaner.clean();        }        // 文件的资源句柄没有被释放,文件名没有被修改        srcfile.renameTo(new File("D:/E/NIO/textBig2_2.txt"));    }

使用 Cleaner,对大于2G文件的资源句柄没有被释放。

 解决方案:暂时没有找到,JDK10之后可能会有 unmap方法,还没印证。

 

3、通过分析源码,了解一下map过程的内部实现。

1.获取FileChannel。

    

    使用了synchronized ,只有一个线程能够初始化FileChannel。

2.通过FileChannel.map方法,获取MappedByteBuffer实例,不过其最终返回的是DirectByteBuffer的实例

    

FileChannel.map方法是抽象的,实际调用的是它的子类FileChannelImpl的map方法

    

2.1 map方法通过native函数map0完成文件的映射工作,把文件映射到虚拟内存,并返回逻辑地址address

2.2 Util.newMappedByteBuffer 获取MappedByteBuffer实例,不过其最终返回的是DirectByteBuffer的实例

3.DirectByteBuffer类    

MappedByteBuffer类是抽象的,DirectByteBuffer是它的一个子类,其实现了对内存的直接操作方法。例如:MappedByteBuffer的get方法最终通过DirectByteBuffer.get方法实现的。

    

map0()函数返回一个地址address,这样就无需调用read或write方法对文件进行读写,通过address就能够操作文件。底层采用unsafe.getByte方法,通过(address + 偏移量)获取指定内存的数据。

既然建立内存映射没有进行实际的数据拷贝,那么进程又怎么能通过(address + 偏移量)最终直接通过内存操作访问到硬盘上的文件呢?

(address + 偏移量)所指向的是一个虚拟地址,要操作其中的数据,必须通过MMU将虚拟地址转换成物理地址。建立内存映射并没有实际拷贝数据,所以,第一次访问虚拟地址所指向的内存区域时,MMU在地址映射表中是无法找到与address + 偏移量相对应的物理地址的,也就是MMU失败,将产生一个缺页中断,缺页中断的中断响应函数会在物理内存中寻找相对应的页帧,如果找不到(也就是该文件从来没有被读入内存的情况),则会通过map()建立的映射关系,直接从硬盘上将文件读取到物流内存中。

 

二、组件Selector

Selector选择器 ,也可以翻译为 多路复用器。选择器允许单线程操作多个通道。用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。从而可以实现单线程管理多个channels,也就是可以管理多个网络链接。

Selector 是NIO相对于BIO实现多路复用的基础,Selector 运行单线程处理多个 Channel,如果你的应用打开了多个通道,但每个连接的流量都很低,使用 Selector 就会很方便。比如聊天服务器。

   

Selector的使用方法:更多方法查看API

1、Selector的创建

通过调用Selector.open()方法创建一个Selector对象,

Selector selector = Selector.open();

2、向Selector注册通道

通过Channel.register()方法来实现。

ServerSocketChannel channel = ServerSocketChannel.open();        //设置通道为非阻塞模式        channel.configureBlocking(false);        SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_ACCEPT);

注意:Channel和Selector一起使用时,Channel必须处于非阻塞模式下。所以FileChannel不适用Selector,因为FileChannel不能切换为非阻塞模式,更准确的来说是因为FileChannel没有继承SelectableChannel。Socket channel可以正常使用。

register()方法的第二个参数:是一个“兴趣(interest)集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。

可以监听四种不同类型的事件:

1.Connect  链接就绪,某个channel成功连接到另一个服务器称为“连接就绪”。

2.Accept  接收就绪,一个server socket channel准备好接收新进入的连接称为“接收就绪”。

3.Read  读就绪,一个有数据可读的通道可以说是“读就绪”。

4.Write  写就绪,等待写数据的通道可以说是“写就绪”。

这四种事件用SelectionKey的四个常量来表示:

SelectionKey.OP_CONNECTSelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITE

如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

 

3、SelectionKey介绍

一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些有用的属性:

key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。key.channel(); // 返回该SelectionKey对应的channel。key.selector(); // 返回该SelectionKey对应的Selector。key.interestOps(); //返回代表需要Selector监控的IO操作的bit maskkey.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。

key.interestOps():

interest集合是Selector对Channel感兴趣的事件集合。可以通过SelectionKey读写interest集合:

int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

key.readyOps()

ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个readySet,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

//创建ready集合的方法int readySet = selectionKey.readyOps();//检查这些操作是否就绪的方法key.isAcceptable();//是否可读,是返回 trueboolean isWritable()://是否可写,是返回 trueboolean isConnectable()://是否可连接,是返回 trueboolean isAcceptable()://是否可接收,是返回 true

Channel + Selector:从SelectionKey访问Channel和Selector很简单。如下:

Channel channel = key.channel();Selector selector = key.selector();

key.attachment():附加的对象

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。

例如,可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:

key.attach(theObject);Object attachedObj = key.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

 

4、从Selector中选择channel

一旦向Selector注册了一个或多个通道,就可以调用几个重载的select()方法返回channel。选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中。

Selector维护的三种类型SelectionKey集合:

  • 已注册的键的集合(Registered key set)

    所有与选择器关联的通道所生成的键的集合称为已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过 keys() 方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引发java.lang.UnsupportedOperationException。

  • 已选择的键的集合(Selected key set)

    所有与选择器关联的通道所生成的键的集合称为已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过 keys() 方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引发java.lang.UnsupportedOperationException。

  • 已取消的键的集合(Cancelled key set)

    已注册的键的集合的子集,这个集合包含了 cancel() 方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。

    注意:

    当键被取消( 可以通过isValid( ) 方法来判断)时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用 select( ) 方法时(或者一个正在进行的select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey将被返回。当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。当选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取消)。一旦键被无效化,调用它的与选择相关的方法就将抛出CancelledKeyException。

select()方法介绍:

在刚初始化的Selector对象中,这三个集合都是空的。 通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:

int num = selector.select();
  • int select():阻塞到至少有一个通道在你注册的事件上就绪了。
  • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
  • int selectNow():非阻塞,只要有通道就绪就立刻返回。

select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

一旦调用select()方法,并且返回值不为0时,则 可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下:

Set selectedKeys=selector.selectedKeys();

进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:

Set selectedKeys = selector.selectedKeys();Iterator keyIterator = selectedKeys.iterator();while(keyIterator.hasNext()) {    SelectionKey key = keyIterator.next();    if(key.isAcceptable()) {        // a connection was accepted by a ServerSocketChannel.    } else if (key.isConnectable()) {        // a connection was established with a remote server.    } else if (key.isReadable()) {        // a channel is ready for reading    } else if (key.isWritable()) {        // a channel is ready for writing    }    keyIterator.remove();}

 

5、停止选择的方法

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。

  • wakeup()方法 :通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回
    该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
  • close()方法 :通过close()方法关闭Selector,
    该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。

 

一个服务端的模板代码

有了服务端的模板代码我们在编写程序时,大多数时间都是在模板代码中添加相应的业务代码。

ServerSocketChannel ssc = ServerSocketChannel.open();ssc.socket().bind(new InetSocketAddress("localhost", 8080));ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {    int readyNum = selector.select();    if (readyNum == 0) {        continue;    }    Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); if(key.isAcceptable()) { // 接受连接 } else if (key.isReadable()) { // 通道可读 } else if (key.isWritable()) { // 通道可写 } it.remove(); }}

客户端与服务端简单交互实例

1)  服务器必须先建立ServerSocket或者ServerSocketChannel 来等待客户端的连接

2)客户端必须建立相对应的Socket或者SocketChannel来与服务器建立连接

3)服务器接受到客户端的连接,再生成一个Socket或者SocketChannel与此客户端通信

import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;import java.util.Set;/** * 服务端 */public class WebServer {    public static void main(String[] args) {        try {            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();            serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8000));            //设置通道为非阻塞模式            serverSocketChannel.configureBlocking(false);            Selector selector = Selector.open();            // 向Selector注册通道,并且指定感兴趣的事件是 Accept            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);            // 缓冲区, 写点数据            ByteBuffer readBuffer = ByteBuffer.allocate(1024);            ByteBuffer writeBuffer = ByteBuffer.allocate(100);            writeBuffer.put("this is WebServer".getBytes());            writeBuffer.flip();                        while (true){                int nReady = selector.select(1000); // 阻塞的时间上限1s                Set
keys = selector.selectedKeys(); Iterator
it = keys.iterator(); while(it.hasNext()){ SelectionKey key = it.next(); it.remove();// 手动移除 if(key.isAcceptable()){ // 接收就绪,创建新的连接,并且把连接注册到Selector上,并且指定感兴趣的事件是 Accept SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); // 这里我们指定 WebServer先读,WebClient先写 socketChannel.register(selector, SelectionKey.OP_READ); }else if (key.isReadable()){ SocketChannel socketChannel = (SocketChannel) key.channel(); readBuffer.clear(); socketChannel.read(readBuffer); readBuffer.flip(); System.out.println("服务器received:" + new String(readBuffer.array())); key.interestOps(SelectionKey.OP_WRITE); }else if(key.isWritable()){ SocketChannel socketChannel = (SocketChannel) key.channel(); writeBuffer.rewind(); socketChannel.write(writeBuffer); key.interestOps(SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } }}
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;/** * 客户端 */public class WebClient {    public static void main(String[] args) {        try {            SocketChannel socketChannel = SocketChannel.open();            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));            // 缓冲区, 写点数据            ByteBuffer readBuffer = ByteBuffer.allocate(1024);            ByteBuffer writeBuffer = ByteBuffer.allocate(100);            writeBuffer.put("this is WebClient".getBytes());            writeBuffer.flip();            while(true){                // 这里我们指定 WebClient先写后读                writeBuffer.rewind();                socketChannel.write(writeBuffer);                readBuffer.clear();                socketChannel.read(readBuffer);                readBuffer.flip();                System.out.println("客户端received:" + new String(readBuffer.array()));            }        } catch (IOException e) {            e.printStackTrace();        }    }}

    

 

三、Scatter/Gather(了解)

Java NIO开始支持scatter/gather,scatter/gather用于描述从Channel中读取或者写入到Channel的操作。从Channel中

分散(scatter)读取,是指在读操作时将读取的数据写入多个buffer中。因此,从Channel中读取的数据将“分散(scatter)”到多个Buffer中。

聚集(gather)写入一个Channel,是指在写操作时将多个buffer的数据写入同一个Channel,因此,多个Buffer中的数据将“聚集(gather)”后写入到一个Channel。

scatter/gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

1、Scattering Reads

Scattering Reads是指数据从一个channel读取到多个buffer中。如下图描述

ByteBuffer header = ByteBuffer.allocate(128);ByteBuffer body   = ByteBuffer.allocate(1024);ByteBuffer[] bufferArray = { header, body };channel.read(bufferArray);

read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,channel紧接着向另一个buffer中写。

Scattering Reads在移动下一个buffer前,必须填满当前的buffer,这也意味着它不适用于大小不固定消息。换句话说,如果存在消息头和消息体,消息头必须完成填充(这里是128byte),Scattering Reads才能正常工作。

2、Gathering Writes

Gathering Writes是指数据从多个buffer写入到同一个channel。如下图描述:

ByteBuffer header = ByteBuffer.allocate(128);ByteBuffer body   = ByteBuffer.allocate(1024);//此处写数据到buffer中ByteBuffer[] bufferArray = { header, body };channel.write(bufferArray);

buffers数组是write()方法的输入参数,write()方法会按照buffer在数组中的顺序,将数据写入到channel,

注意:只有position和limit之间的数据才会被写入。因此,如果一个buffer的容量为128byte,但是仅仅包含58byte的数据,那么这58byte的数据将被写入到channel中。因此与Scattering Reads相反,Gathering Writes能较好的处理动态消息。

 

四、通道之间的数据传输

在Java NIO中,如果两个通道中有一个是FileChannel,那么可以直接将数据从一个channel传输到另外一个channel。

1、transferFrom()

FileChannel的transferFrom()方法可以将数据从源通道传输到FileChannel中,下面是一个简单文件复制的例子:

public static void main(String[] args) throws Exception {        RandomAccessFile fromFile = new RandomAccessFile("D:/E/NIO/fromFile.txt", "rw");        FileChannel fromChannel = fromFile.getChannel();        RandomAccessFile toFile = new RandomAccessFile("D:/E/NIO/toFile.txt", "rw");        FileChannel  toChannel = toFile.getChannel();        long position = 0;        long count = fromChannel.size();        // 参数position表示从position处开始向目标文件写入数据,count表示最多传输的字节数。        toChannel.transferFrom(fromChannel, position, count);    }

 2、transferTo()

transferTo()方法将数据从FileChannel传输到其他的channel中。下面是一个简单文件复制的例子:

public static void main(String[] args) throws Exception {        RandomAccessFile fromFile = new RandomAccessFile("D:/E/NIO/fromFile.txt", "rw");        FileChannel fromChannel = fromFile.getChannel();        RandomAccessFile toFile = new RandomAccessFile("D:/E/NIO/toFile.txt", "rw");        FileChannel  toChannel = toFile.getChannel();        long position = 0;        long count = fromChannel.size();//        toChannel.transferFrom(fromChannel, position, count);        fromChannel.transferTo(position, count, toChannel);    }

五、Pipe管道

Java NIO Pipe是2个线程之间的单向数据连接。

Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

    

Pipo管道创建,,向管道写数据,需要访问sink通道; 从读取管道的数据,需要访问source通道。一个简单例子:

public class PipoDemo {    public static void main(String[] args) {        try {            Pipe pipe = Pipe.open();            new Thread(new ThreadA(pipe)).start();            new Thread(new ThreadB(pipe)).start();        } catch (IOException e) {            e.printStackTrace();        }    }}// A线程放数据class ThreadA implements Runnable{    private Pipe pipe;    public ThreadA(Pipe pipe) {        this.pipe = pipe;    }    @Override    public void run() {        try {            ByteBuffer buffer = ByteBuffer.allocate(1024);            buffer.put("ThreadA send data".getBytes());            buffer.flip();            Pipe.SinkChannel sinkChannel = pipe.sink();            while (buffer.remaining() > 0){                 sinkChannel.write(buffer);            }        } catch (IOException e) {            e.printStackTrace();        }    }}// B线程取数据class ThreadB implements Runnable{    private Pipe pipe;    public ThreadB(Pipe pipe) {        this.pipe = pipe;    }    @Override    public void run() {        try {            ByteBuffer buffer = ByteBuffer.allocate(1024);            Pipe.SourceChannel sourceChannel = pipe.source();            sourceChannel.read(buffer);            buffer.flip();            System.out.println("ThreadB received:" + new String(buffer.array()));        } catch (IOException e) {            e.printStackTrace();        }    }}

 

参考文章:

 

—— Stay Hungry. Stay Foolish. 求知若饥,虚心若愚。

转载地址:http://ggdgn.baihongyu.com/

你可能感兴趣的文章
你的钱为什么被转走,这篇文章告诉你答案(CSRF详解)
查看>>
JVM中的一个小知识点:深堆和浅堆的概念
查看>>
HashMap的负载因子初始值为什么是0.75?这篇文章以最通俗的方式告诉你答案
查看>>
详解java中一个面试常问的知识点-阻塞队列
查看>>
除了Thread和Runnable,你还知道第三种创建线程的方式Callable吗
查看>>
java线程面试题集锦(第一版本)
查看>>
记一次java中三元表达式的坑(避免踩坑)
查看>>
设计模式之桥接模式
查看>>
设计模式之组合模式
查看>>
java网络编程(1)基础知识点总结
查看>>
java网络编程(2)Socket编程案例(TCP和UDP两种)
查看>>
设计模式之享元模式
查看>>
深入分析java中的多态原理(jvm角度分析)
查看>>
SpringBoot系列(1)基础入门和案例
查看>>
设计模式之命令模式
查看>>
springBoot系列(2)整合MongoDB实现增删改查(完整版)
查看>>
java关键字(6)void
查看>>
面试必问:java中String对象为什么要设计成不可变的呢?
查看>>
深入分析java中的反射机制
查看>>
java集合类(7)Stack
查看>>