初試Java 7 NIO2:實(shí)現(xiàn)高性能的HTTP Server
本文來(lái)自DoubleH的BlogJava博客,原文標(biāo)題為《JDK7 NIO2 實(shí)踐: 增加 TransmitFile支持》。對(duì)于Java 7 NIO2特性的更多描述,可參考以前Google的一次技術(shù)演講。
#t#JDK7的NIO2特性或許是我最期待的,我一直想基于它寫(xiě)一個(gè)高性能的Java Http Server.現(xiàn)在這個(gè)想法終于可以實(shí)施了。
本人基于目前最新的JDK7 b76開(kāi)發(fā)了一個(gè)HTTP Server性能確實(shí)不錯(cuò)。
在windows平臺(tái)上NIO2采用AccpetEx來(lái)異步接受連接,并且讀寫(xiě)全都關(guān)聯(lián)到IOCP完成端口。不僅如此,為了方便開(kāi)發(fā)者使用,連IOCP工作線程都封裝好了,你只要提供線程池就OK。
但是要注意,IOCP工作線程的線程池必須是 Fix的,因?yàn)槟惆l(fā)出的讀寫(xiě)請(qǐng)求都關(guān)聯(lián)到相應(yīng)的線程上,如果線程死了,那讀寫(xiě)完成情況是不知道的。
作為一個(gè)Http Server,傳送文件是必不可少的功能,那一般文件的傳送都是要把程序里的buffer拷貝到內(nèi)核的buffer,由內(nèi)核發(fā)送出去的。windows平臺(tái)上為這種情況提供了很好的解決方案,使用TransmitFile接口
- BOOL TransmitFile(
- SOCKET hSocket,
- HANDLE hFile,
- DWORD nNumberOfBytesToWrite,
- DWORD nNumberOfBytesPerSend,
- LPOVERLAPPED lpOverlapped,
- LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers,
- DWORD dwFlags
- );
你只要把文件句柄發(fā)送給內(nèi)核就行了,內(nèi)核幫你搞定其余的,真正做到Zero-Copy.
但是很不幸,NIO2里AsynchronousSocketChannel沒(méi)有提供這樣的支持。而為HTTP Server的性能考量,本人只好自己增加這個(gè)支持。
要無(wú)縫支持,這個(gè)必須得表現(xiàn)的跟 Read /Write一樣,有完成的通知,通知傳送多少數(shù)據(jù),等等。
仔細(xì)讀完sun的IOCP實(shí)現(xiàn)以后發(fā)現(xiàn)這部分工作他們封裝得很好,基本只要往他們的框架里加?xùn)|西就好了。
為了能訪問(wèn)他們的框架代碼,我定義自己的TransmitFile支持類(lèi)在sun.nio.ch包里,以獲得最大的權(quán)限。
- package sun.nio.ch;
- import java.io.IOException;
- import java.lang.reflect.Field;
- import java.nio.channels.AsynchronousCloseException;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.CompletionHandler;
- import java.nio.channels.NotYetConnectedException;
- import java.nio.channels.WritePendingException;
- import java.util.concurrent.Future;
- /**
- * @author Yvon
- *
- */
- public class WindowsTransmitFileSupport {
- //Sun's NIO2 channel implementation class
- private WindowsAsynchronousSocketChannelImpl channel;
- //nio2 framework core data structure
- PendingIoCache ioCache;
- //some field retrieve from sun channel implementation class
- private Object writeLock;
- private Field writingF;
- private Field writeShutdownF;
- private Field writeKilledF; // f
- WindowsTransmitFileSupport()
- {
- //dummy one for JNI code
- }
- /**
- *
- */
- public WindowsTransmitFileSupport(
- AsynchronousSocketChannel
- channel) {
- this.channel = (WindowsAsynchronousSocketChannelImpl)channel;
- try {
- // Initialize the fields
- Field f = WindowsAsynchronousSocketChannelImpl.class
- .getDeclaredField("ioCache");
- f.setAccessible(true);
- ioCache = (PendingIoCache) f.get(channel);
- f = AsynchronousSocketChannelImpl.class
- .getDeclaredField("writeLock");
- f.setAccessible(true);
- writeLock = f.get(channel);
- writingF = AsynchronousSocketChannelImpl.class
- .getDeclaredField("writing");
- writingF.setAccessible(true);
- writeShutdownF = AsynchronousSocketChannelImpl.class
- .getDeclaredField("writeShutdown");
- writeShutdownF.setAccessible(true);
- writeKilledF = AsynchronousSocketChannelImpl.class
- .getDeclaredField("writeKilled");
- writeKilledF.setAccessible(true);
- } catch (NoSuchFieldException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SecurityException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IllegalArgumentException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- /**
- * Implements the task to initiate a write and the handler to consume the
- * result when the send file completes.
- */
- private class SendFileTask
implements Runnable, Iocp.ResultHandler { - private final PendingFuture
result; - private final long file;//file is windows file HANDLE
- SendFileTask(long file, PendingFuture
result) { - this.result = result;
- this.file = file;
- }
- @Override
- // @SuppressWarnings("unchecked")
- public void run() {
- long overlapped = 0L;
- boolean pending = false;
- boolean shutdown = false;
- try {
- channel.begin();
- // get an OVERLAPPED structure (from the cache or allocate)
- overlapped = ioCache.add(result);
- int n = transmitFile0(channel.handle, file, overlapped);
- if (n == IOStatus.UNAVAILABLE) {
- // I/O is pending
- pending = true;
- return;
- }
- if (n == IOStatus.EOF) {
- // special case for shutdown output
- shutdown = true;
- throw new ClosedChannelException();
- }
- // write completed immediately
- throw new InternalError("Write completed immediately");
- } catch (Throwable x) {
- // write failed. Enable writing before releasing waiters.
- channel.enableWriting();
- if (!shutdown && (x instanceof ClosedChannelException))
- x = new AsynchronousCloseException();
- if (!(x instanceof IOException))
- x = new IOException(x);
- result.setFailure(x);
- } finally {
- // release resources if I/O not pending
- if (!pending) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
- }
- channel.end();
- }
- // invoke completion handler
- Invoker.invoke(result);
- }
- /**
- * Executed when the I/O has completed
- */
- @Override
- @SuppressWarnings("unchecked")
- public void completed(int bytesTransferred, boolean canInvokeDirect) {
- // release waiters if not already released by timeout
- synchronized (result) {
- if (result.isDone())
- return;
- channel.enableWriting();
- result.setResult((V) Integer.valueOf(bytesTransferred));
- }
- if (canInvokeDirect) {
- Invoker.invokeUnchecked(result);
- } else {
- Invoker.invoke(result);
- }
- }
- @Override
- public void failed(int error, IOException x) {
- // return direct buffer to cache if substituted
- // release waiters if not already released by timeout
- if (!channel.isOpen())
- x = new AsynchronousCloseException();
- synchronized (result) {
- if (result.isDone())
- return;
- channel.enableWriting();
- result.setFailure(x);
- }
- Invoker.invoke(result);
- }
- }
- public
extends Number, A> Future sendFile( long file, A att,- CompletionHandler
super A> handler) { - boolean closed = false;
- if (channel.isOpen()) {
- if (channel.remoteAddress == null)
- throw new NotYetConnectedException();
- // check and update state
- synchronized (writeLock) {
- try{
- if (writeKilledF.getBoolean(channel))
- throw new IllegalStateException(
- "Writing not allowed due to timeout or cancellation");
- if (writingF.getBoolean(channel))
- throw new WritePendingException();
- if (writeShutdownF.getBoolean(channel)) {
- closed = true;
- } else {
- writingF.setBoolean(channel, true);
- }
- }catch(Exception e)
- {
- IllegalStateException ise=new IllegalStateException(" catch exception when write");
- ise.initCause(e);
- throw ise;
- }
- }
- } else {
- closed = true;
- }
- // channel is closed or shutdown for write
- if (closed) {
- Throwable e = new ClosedChannelException();
- if (handler == null)
- return CompletedFuture.withFailure(e);
- Invoker.invoke(channel, handler, att, null, e);
- return null;
- }
- return implSendFile(file,att,handler);
- }
extends Number, A> Future implSendFile( long file, A attachment,- CompletionHandler
super A> handler) { - // setup task
- PendingFuture
result = new PendingFuture(channel, handler, - attachment);
- SendFileTask
sendTask= new SendFileTask(file,result); - result.setContext(sendTask);
- // initiate I/O (can only be done from thread in thread pool)
- // initiate I/O
- if (Iocp.supportsThreadAgnosticIo()) {
- sendTask.run();
- } else {
- Invoker.invokeOnThreadInThreadPool(channel, sendTask);
- }
- return result;
- }
- private native int transmitFile0(long handle, long file,
- long overlapped);
- }
這個(gè)操作跟默認(rèn)實(shí)現(xiàn)的里的write操作是很像的,只是最后調(diào)用的本地方法不一樣。。
接下來(lái),我們?cè)趺词褂媚?,這個(gè)類(lèi)是定義在sun的包里的,直接用的話,會(huì)報(bào)IllegalAccessError,因?yàn)槲覀兊念?lèi)加載器跟初始化加載器是不一樣的。
解決辦法一個(gè)是通過(guò)啟動(dòng)參數(shù)-Xbootclasspath,讓我們的包被初始加載器加載。我個(gè)人不喜歡這種辦法,所以就采用JNI來(lái)定義我們的windows TransmitFile支持類(lèi)。
這樣我們的工作算是完成了,注意,發(fā)送文件的時(shí)候傳得是文件句柄,這樣做的好處是你可以更好的控制,一般是在發(fā)送前,打開(kāi)文件句柄,完成后在回調(diào)通知方法里關(guān)閉文件句柄。
有興趣的同學(xué)可以看看我的HTTP server項(xiàng)目:
http://code.google.com/p/jabhttpd/
目前基本功能實(shí)現(xiàn)得差不多,做了些簡(jiǎn)單的測(cè)試,性能比較滿意。這個(gè)服務(wù)器不打算支持servlet api,基本是專(zhuān)門(mén)給做基于長(zhǎng)連接模式通信的定做的。