4、tomcat请求接收处理

Sep 15, 2016


前言

上一节我们已经知道tomcat是如何启动的了,那么tomcat启动完成之后,如何接收请求呢?从本节开始,我们会详细分析tomcat接收请求到最后处理的过程。

请求接收

上一节我们提到两个重要的线程,辨识线程和接收线程,实际上tomcat接收请求就是通过接收线程来实现的,tomcat的接收线程对象是org.apache.tomcat.util.net.AbstractEndpoint.Acceptor,这是一个抽象行数,它是AbstractEndpoint抽象类的内部类,它的实现类也在AbstractEndpoint的各个实现类的内部类中,http协议入口默认的实现类是org.apache.tomcat.util.net.NioEndpoint.Acceptor,我们来看一下这个实现类是如何接收请求的,实际上它也是一个Runnable的实现类,因此线程启动之后,会进入run()方法:

@Override
public void run() {

    int errorDelay = 0;

    while (running) {

        // ... 略去不影响逻辑理解的代码

        try {
            //if we have reached max connections, wait
            countUpOrAwaitConnection();

            SocketChannel socket = null;
            try {
                // 【1】等待请求连接
                socket = serverSock.accept();
            } catch (IOException ioe) {
                // ... 略去异常处理
            }
            errorDelay = 0;

            if (running && !paused) {
                // 【2】将请求连接放入队列等待处理
                if (!setSocketOptions(socket)) {
                    countDownConnection();
                    closeSocket(socket);
                }
            } else {
                countDownConnection();
                closeSocket(socket);
            }
        } catch (SocketTimeoutException sx) {
            // ... 略去所有异常处理
        }
    }
    state = AcceptorState.ENDED;
}

这个接收线程很简单,就是通过socketServer等待连接,当然当有一个连接来的时候,会通过setSocketOptions(socket)将连接放入队列,等待辨识线程处理。

我们来看一下setSocketOptions(socket)这个方法:

protected boolean setSocketOptions(SocketChannel socket) {
try {
    socket.configureBlocking(false);
    Socket sock = socket.socket();
    socketProperties.setProperties(sock);
    // 【1】构造通道对象
    NioChannel channel = nioChannels.pop();
    if ( channel == null ) {
        if (sslContext != null) {
            SSLEngine engine = createSSLEngine();
            int appbufsize = engine.getSession().getApplicationBufferSize();
            NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
                                                               Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
                                                               socketProperties.getDirectBuffer());
            channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
        } else {
            NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
                                                               socketProperties.getAppWriteBufSize(),
                                                               socketProperties.getDirectBuffer());

            channel = new NioChannel(socket, bufhandler);
        }
    } else {
        channel.setIOChannel(socket);
        if ( channel instanceof SecureNioChannel ) {
            SSLEngine engine = createSSLEngine();
            ((SecureNioChannel)channel).reset(engine);
        } else {
            channel.reset();
        }
    }
    // 【2】把通道对象放入辨识线程的队列中
    getPoller0().register(channel);
} catch (Throwable t) {
    ExceptionUtils.handleThrowable(t);
    try {
        log.error("",t);
    } catch (Throwable tt) {
        ExceptionUtils.handleThrowable(tt);
    }
    return false;
}
return true;
}

我们可以看到,首先构造了一个通道对象(NioChannel),然后把这个通道对象放入了辨识线程的队列中,这里需要注意,在tomcat中辨识线程可以是一个或多个,getPoller0()会根据特定的策略平衡每个辨识线程的任务数,最后真正将通道交给辨识线程的队列等待处理是通过register(channel)方法的,我们进一步看看这个方法:

public void register(final NioChannel socket) {
    socket.setPoller(this);
    // 【1】构造辨识事件对象
    KeyAttachment ka = new KeyAttachment(socket);
    ka.setPoller(this);
    ka.setTimeout(getSocketProperties().getSoTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);
    // 【2】加入辨识事件队列
    addEvent(r);
}

这个方法真正的作用其实是通过通道对象构造一个辨识事件对象PollerEvent,并把这个事件对象放入辨识线程的事件队列中,放入辨识事件队列是通过addEvent(r)方法实现的:

private void addEvent(PollerEvent event) {
    // 【1】辨识事件加入队列
    events.offer(event);
    // 【2】通知辨识线程触发新的事件
    if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

这个方法很简单,只是把事件加入辨识队列,并检查是否需要通知辨识线程。

注:本节两个重点,通道对象构造和通知辨识线程触发新的事件可能有的同学看不懂,因为这里涉及一个非常重要的概念:NIO编程,这个概念非常重要,在这里由于跟tomcat本身并没有太多关系,所以不做专门的讨论,看不懂的同学请自行百度,jdk nio和jdk selector学习一下相关的知识。

请求处理

现在我们已经知道tomcat是如何接收请求的,并且接受请求之后并没有做什么处理,只是将请求的连接构造成一个辨识事件对象并放入到辨识线程的事件队列中,接下来我们来看一下辨识线程如何从队列中获取请求并处理的,辨识线程对象org.apache.tomcat.util.net.NioEndpoint.Poller实际上也是一个Runnable的实现类,因此它启动后也会进入run()方法:

@Override
public void run() {
    while (true) {
        try {
            
            // ... 略去不影响逻辑理解的代码
            // 【1】判断是否有事件在队列中等待处理
            if ( keyCount == 0 ) hasEvents = (hasEvents | events());
            // 【2】取出队列中所有事件对象
            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                KeyAttachment attachment = (KeyAttachment)sk.attachment();
                if (attachment == null) {
                    iterator.remove();
                } else {
                    attachment.access();
                    iterator.remove();
                    // 【3】处理事件对象
                    processKey(sk, attachment);
                }
            }

            timeout(keyCount,hasEvents);
            if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
        } catch (OutOfMemoryError oom) {
            // ... 略去异常处理
        }
    }

    stopLatch.countDown();
}

这个线程启动之后直接进入一个死循环中,因此线程永远不会结束,而是不停的从事件队列中取出事件来处理,主要按照下面的逻辑处理:

  • 判断队列中是否有事件
  • 取出队列中所有事件对象
  • 处理每一个事件对象

注意,这里虽然不是直接处理PollerEvent的对象,但是实际上selector.selectedKeys()的结果是由通道事件触发次数决定的,有多个环节会触发事件,因此这里处理的其实不仅仅是接收到的新请求的事件,所以这里可以认为是实际处理通道中的各个事件的。

注:这里还是看不懂的话,强烈建议先百度学习NIO编程中的Selector对象的作用。

我们来看一下真正处理通道事件的方法:

protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
    boolean result = true;
   
   // ... 省略不影响理解的代码
   
    if ( isWorkerAvailable() ) {
        //【1】注销队列中当前事件
        unreg(sk, attachment, sk.readyOps());
        boolean closeSocket = false;
        if (sk.isReadable()) {
            //【2】处理从通道中读数据的事件
            if (!processSocket(attachment, SocketStatus.OPEN_READ, true)) {
                closeSocket = true;
            }
        }
        if (!closeSocket && sk.isWritable()) {
            // 【3】处理从通道中写数据的事件
            if (!processSocket(attachment, SocketStatus.OPEN_WRITE, true)) {
                closeSocket = true;
            }
        }
        if (closeSocket) {
            cancelledKey(sk,SocketStatus.DISCONNECT);
        }
    } else {
        result = false;
    }
            
    // ... 省略不影响理解的代码
            
    return result;
}

这里我们可以看到,取得事件之后,处理之前先将事件从事件队列中注销了,这是为了防止重复处理,通道每次有新的事件触发都会重新注册事件,因此每次处理完事件都必须从通道中注销以免重复处理。

接下来判断当前事件是读数据事件还是写数据事件。

如果我们在servlet中处理完成之后往通道写了内容,那么这个时候就是写事件,并把我们写的内容写入到通道给浏览器,这里我们不详细看写事件的处理了,我们只分析读事件。我们来看一下processSocket的源码:

protected boolean processSocket(KeyAttachment attachment, SocketStatus status, boolean dispatch) {
    try {
        if (attachment == null) {
            return false;
        }
        //【1】构造处理器
        SocketProcessor sc = processorCache.pop();
        if ( sc == null ) sc = new SocketProcessor(attachment, status);
        else sc.reset(attachment, status);
        //【2】把处理器的任务交给线程池或者直接处理
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        log.warn(sm.getString("endpoint.executor.fail", attachment.getSocket()), ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

这里tomcat创建了一个处理器org.apache.tomcat.util.net.NioEndpoint.SocketProcessor对象,这个处理器实际上是一个Runnable对象,然后将这个对象作为任务交给了线程池,或者直接运行这个处理器进行处理。

接下来,我们看一下处理器如何处理这个连接:

@Override
public void run() {
    NioChannel socket = ka.getSocket();

    if (ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
        synchronized (ka.getWriteThreadLock()) {
            doRun();
        }
    } else {
        synchronized (socket) {
            doRun();
        }
    }
}

run方法其实没做什么,直接调用了doRun:

private void doRun() {
    NioChannel socket = ka.getSocket();
    SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

    try {
        
        // ... 省略不影响理解的代码
        
        if (handshake == 0) {
            SocketState state = SocketState.OPEN;
            if (status == null) {
                // 【1】通过handler处理这个事件
                state = handler.process(ka, SocketStatus.OPEN_READ);
            } else {
                state = handler.process(ka, status);
            }
            if (state == SocketState.CLOSED) {
                close(socket, key, SocketStatus.ERROR);
            }
        } else if (handshake == -1 ) {
            close(socket, key, SocketStatus.DISCONNECT);
        } else {
            ka.getPoller().add(socket,handshake);
        }
    } catch (CancelledKeyException cx) {
        
        // ... 省略异常处理
        
    } finally {
        ka = null;
        status = null;
        if (running && !paused) {
            processorCache.push(this);
        }
    }
}

这里其实是直接调用了handler来处理这个事件,handler只有两个实现类,org.apache.coyote.ajp.AjpNioProtocol.AjpConnectionHandlerorg.apache.coyote.http11.Http11NioProtocol.Http11ConnectionHandler,我们这里是http请求,因此真正的处理类是Http11ConnectionHandler,当然,这里调用的是抽象类org.apache.coyote.AbstractProtocol.AbstractConnectionHandler的方法,这个方法做了大量的判断和状态处理,但是都比较零碎和细节,我们不去太多分析,最终调用的是Http11ConnectionHandlerprocess方法:

@Override
public SocketState process(SocketWrapper<S> socketWrapper)
    throws IOException {
        
    // ... 省略不影响理解的代码
    
    if (!getErrorState().isError()) {
        try {
            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
            //【1】获取适配器,并通过适配器处理请求
            getAdapter().service(request, response);
            if(keepAlive && !getErrorState().isError() && !isAsync() &&
                    statusDropsConnection(response.getStatus())) {
                setErrorState(ErrorState.CLOSE_CLEAN, null);
            }
            setCometTimeouts(socketWrapper);
        } catch (InterruptedIOException e) {
            // ... 省略不影响理解的异常处理
        }
    }

    // ... 省略不影响理解的代码
}

这个方法非常长,里边包含了大量的判断和异常处理,这里不全部讨论,我们只讨论正常情况下的流程。

上一步中我们知道,辨识线程将辨识事件构造成了一个事件处理器,实际上这个事件处理器中将事件和通道内容构造成了一个请求对象和响应对象,org.apache.coyote.Requestorg.apache.coyote.Response,注意,这里还不是servlet中的请求和响应。

这个时候处理器根据协议,将请求和响应交给适配器处理了,这个适配器的主要功能就是将tomcat的request和response适配转换成servlet中的request对象和response对象,我们来看一下这个适配器的处理过程:

@SuppressWarnings("deprecation")
@Override
public void service(org.apache.coyote.Request req,
                    org.apache.coyote.Response res)
    throws Exception {
    
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {

        // 【1】构造servlet的request对象和response对象    
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // ... 省略部分设置属性代码
        
    }

    // ... 省略不影响理解的代码

    try {
            // ... 省略不影响理解的代码
            
            // 【2】把请求交给真正的容器处理
            connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

            // ... 省略不影响理解的代码
            
        }

        // ... 省略不影响理解的代码
        
    } catch (IOException e) {
        // Ignore
    } finally {
        // ... 省略不影响理解的代码
    }

}

从这里开始tomcat构造了真正符合j2ee标准的request对象和response对象,然后通过container容器的管道链进行处理,在第2节的时候,我们已经提到管道和阀门的概念了,将请求交给管道处理之后,请求会经过一个一个阀门,最终通过org.apache.catalina.core.StandardWrapperValve这个阀门进入真正的容器处理链,我们来看一下这个代码:

@Override
public final void invoke(Request request, Response response)
    throws IOException, ServletException {

    // ... 省略不影响理解的代码

    try {
        if ((servlet != null) && (filterChain != null)) {
            // Swallow output if needed
            if (context.getSwallowOutput()) {
                try {
                    SystemLogHandler.startCapture();
                    if (request.isAsyncDispatching()) {
                        request.getAsyncContextInternal().doInternalDispatch();
                    } else if (comet) {
                        filterChain.doFilterEvent(request.getEvent());
                    } else {
                        //【1】进入应用的拦截器处理链
                        filterChain.doFilter(request.getRequest(),
                                response.getResponse());
                    }
                } finally {
                    String log = SystemLogHandler.stopCapture();
                    if (log != null && log.length() > 0) {
                        context.getLogger().info(log);
                    }
                }
            } else {
                if (request.isAsyncDispatching()) {
                    request.getAsyncContextInternal().doInternalDispatch();
                } else if (comet) {
                    filterChain.doFilterEvent(request.getEvent());
                } else {
                    //【1】进入应用的拦截器处理链
                    filterChain.doFilter
                        (request.getRequest(), response.getResponse());
                }
            }

        }
    } catch (ClientAbortException e) {
        // ... 省略不影响理解的异常处理
    }

    // ... 省略不影响理解的代码

}

从上面的代码我们可以看出,最终的判断会真正进入应用的拦截器处理链,也就是J2EE中定义的规范,先通过拦截器,再通过servlet处理这个请求。

这个拦截器处理链的处理实现类是org.apache.catalina.core.ApplicationFilterChain,这里我们代码不再往下读了,最终会调用这个方法:

private void internalDoFilter(ServletRequest request, ServletResponse response)

并在这个方法中调用servlet.service(ServletRequest request, ServletResponse response)进入我们熟悉的servlet环节。

总结

在本节我们完整的讨论了tomcat在接收一个请求之后完整的处理过程,限于篇幅原因,很多源代码都做了删减,也没办法一行一行解释这些代码,因此看本节的时候,强烈建议自己搭建一个ide环境,跟着本节的思路去读源码,比较难理解的地方可以通过debug调试帮助自己理解这个过程,另外,我省略了很多异常情况的判断和处理,因为这里边需要解释的细节实在太多了,和这个系列文章最初的目标定位不一致,因此我全部省略掉了,感兴趣的读者可以自己花时间去阅读。