用户工具

站点工具


about_jetty_6

Jetty的那些事儿 第六章:Request/Response请求处理流程

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_jetty_6

继上一章IO模型初始化之后,Jetty服务器算是正式的启动完成了。

我们通过一个访问请求来分析下:

curl http://localhost:8080/xxx/hello

1.接收请求

首先,上一章提到的acceptor将会接收请求,并提交到队列当中:

public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel server;
        synchronized(this)
        {
            server = _acceptChannel;
        }

        if (server!=null && server.isOpen() && _manager.isStarted())
        {
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Socket socket = channel.socket();
            configure(socket);
            _manager.register(channel);
        }
    }

请求被通过roundbin的方式提交给了selector的队列中:

public void register(SocketChannel channel)
    {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++;
        if (s<0)
            s=-s;
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup();
        }
    }

2.处理请求队列

接着selectSet的doSelector方法从请求队列中获取accptor提交的change事件,并进行处理:

 public void doSelect() throws IOException
        {
            try
            {
                _selecting=Thread.currentThread();
                final Selector selector=_selector;
                // Stopped concurrently ?
                if (selector == null)
                    return;

                // Make any key changes required
                Object change;
                int changes=_changes.size();
                while (changes-->0 && (change=_changes.poll())!=null)
                {
       
       ...
       ...
       
       
 else if (change instanceof ChannelAndAttachment)
                        {
                            // finish accepting/connecting this connection
        final ChannelAndAttachment asc = (ChannelAndAttachment)change;
        final SelectableChannel channel=asc._channel;
        ch=channel;
        final Object att = asc._attachment;

        if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
        {
            key = channel.register(selector,SelectionKey.OP_READ,att);
            SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
            key.attach(endpoint);
            endpoint.schedule();
        }                                

上面的代码创建了EndPoint并执行调度方法:endpoint.schedule().

在这里我们看下相关核心概念的类图:

3.endPoint的初始化

我们先来看初始化逻辑:

protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
    {
        SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
        endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
        return endp;
    }
    

3.1 SelectChannelEndPoint初始化

ChannelEndPoint只是保存了socket的地址信息和socket句柄,以及socket的读写超时时间:

protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException
    {
        this._channel = channel;
        _maxIdleTime=maxIdleTime;
        _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
        if (_socket!=null)
        {
            _local=(InetSocketAddress)_socket.getLocalSocketAddress();
            _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
            _socket.setSoTimeout(_maxIdleTime);
        }
        else
        {
            _local=_remote=null;
        }
    }

SelectChannelEndPoint初始化了业务逻辑的handler:

public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{
    public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");

    private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
    private final SelectorManager.SelectSet _selectSet;
    private final SelectorManager _manager;
    private  SelectionKey _key;
    private final Runnable _handler = new Runnable()
        {
            public void run() { handle(); }
        };

3.2 connection初始化

public AbstractHttpConnection(Connector connector, EndPoint endpoint, Server server)
    {
        super(endpoint);
        _uri = StringUtil.__UTF8.equals(URIUtil.__CHARSET)?new HttpURI():new EncodedHttpURI(URIUtil.__CHARSET);
        _connector = connector;
        HttpBuffers ab = (HttpBuffers)_connector;
        _parser = newHttpParser(ab.getRequestBuffers(), endpoint, new RequestHandler());
        _requestFields = new HttpFields();
        _responseFields = new HttpFields();
        _request = new Request(this);
        _response = new Response(this);
        _generator = newHttpGenerator(ab.getResponseBuffers(), endpoint);
        _generator.setSendServerVersion(server.getSendServerVersion());
        _server = server;
    }

可以看到,这里初始化了请求处理最关键的几个数据结构:

uri:用于解析uri参数

HttpParser:用于解析http请求

Request:传给servlet的request

Response:传给servlet的response

HttpGenerator:用于生成http

3.3 endpoint.schedule()

这一步是非常关键的,也就是将处理请求的业务逻辑提交给线程池

 public void schedule()
    {
        synchronized (this)
        {
            // If there is no key, then do nothing
            if (_key == null || !_key.isValid())
            {
                _readBlocked=false;
                _writeBlocked=false;
                this.notifyAll();
                return;
            }

            // If there are threads dispatched reading and writing
            if (_readBlocked || _writeBlocked)
            {
                // assert _dispatched;
                if (_readBlocked && _key.isReadable())
                    _readBlocked=false;
                if (_writeBlocked && _key.isWritable())
                    _writeBlocked=false;

                // wake them up is as good as a dispatched.
                this.notifyAll();

                // we are not interested in further selecting
                _key.interestOps(0);
                if (_state<STATE_DISPATCHED)
                    updateKey();
                return;
            }

            // Remove writeable op
            if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
            {
                // Remove writeable op
                _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
                _key.interestOps(_interestOps);
                _writable = true; // Once writable is in ops, only removed with dispatch.
            }

            // If dispatched, then deregister interest
            if (_state>=STATE_DISPATCHED)
                _key.interestOps(0);
            else
            {
                // other wise do the dispatch
                dispatch();
                if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
                {
                    _key.interestOps(0);
                }
            }
        }
    }
 

上面代码最关键的一句是dispatch函数:

public void dispatch()
    {
        synchronized(this)
        {
            if (_state<=STATE_UNDISPATCHED)
            {
                if (_onIdle)
                    _state = STATE_NEEDS_DISPATCH;
                else
                {
                    _state = STATE_DISPATCHED;
                    boolean dispatched = _manager.dispatch(_handler);
                    if(!dispatched)
                    {
                        _state = STATE_NEEDS_DISPATCH;
                        LOG.warn("Dispatched Failed! "+this+" to "+_manager);
                        updateKey();
                    }
                }
            }
        }
    }
public boolean dispatch(Runnable task)
        {
            ThreadPool pool=getThreadPool();
            if (pool==null)
                pool=getServer().getThreadPool();
            return pool.dispatch(task);
        }

最终将handler提交到了线程池,进行请求的处理。

4.hanlder处理http请求

protected void handle()
    {
        boolean dispatched=true;
        try
        {
            while(dispatched)
            {
                try
                {
                    while(true)
                    {
                        final AsyncConnection next = (AsyncConnection)_connection.handle();
                        if (next!=_connection)
                        {
                            LOG.debug("{} replaced {}",next,_connection);
                            Connection old=_connection;
                            _connection=next;
                            _manager.endPointUpgraded(this,old);
                            continue;
                        }
                        break;
                    }
                }
                catch (ClosedChannelException e)
                {
                    LOG.ignore(e);
                }
                catch (EofException e)
                {
                    LOG.debug("EOF", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (IOException e)
                {
                    LOG.warn(e.toString());
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (Throwable e)
                {
                    LOG.warn("handle failed", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                finally
                {
                    if (!_ishut && isInputShutdown() && isOpen())
                    {
                        _ishut=true;
                        try
                        {
                            _connection.onInputShutdown();
                        }
                        catch(Throwable x)
                        {
                            LOG.warn("onInputShutdown failed", x);
                            try{close();}
                            catch(IOException e2){LOG.ignore(e2);}
                        }
                        finally
                        {
                            updateKey();
                        }
                    }
                    dispatched=!undispatch();
                }
            }
        }
        finally
        {
            if (dispatched)
            {
                dispatched=!undispatch();
                while (dispatched)
                {
                    LOG.warn("SCEP.run() finally DISPATCHED");
                    dispatched=!undispatch();
                }
            }
        }
    }

上面代码最核心的逻辑为:

final AsyncConnection next = (AsyncConnection)_connection.handle()

public Connection handle() throws IOException
    {
        Connection connection = this;
        boolean some_progress=false;
        boolean progress=true;

        try
        {
            setCurrentConnection(this);

            // don't check for idle while dispatched (unless blocking IO is done).
            _asyncEndp.setCheckForIdle(false);


            // While progress and the connection has not changed
            while (progress && connection==this)
            {
                progress=false;
                try
                {
                    // Handle resumed request
                    if (_request._async.isAsync())
                    {
                       if (_request._async.isDispatchable())
                           handleRequest();
                    }
                    // else Parse more input
                    else if (!_parser.isComplete() && _parser.parseAvailable())
                        progress=true;

                    // Generate more output
                    if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown() && !_request.getAsyncContinuation().isAsyncStarted())
                        if (_generator.flushBuffer()>0)
                            progress=true;

                    // Flush output
                    _endp.flush();

                    // Has any IO been done by the endpoint itself since last loop
                    if (_asyncEndp.hasProgressed())
                        progress=true;
                }
                catch (HttpException e)
                {
                    if (LOG.isDebugEnabled())
                    {
                        LOG.debug("uri="+_uri);
                        LOG.debug("fields="+_requestFields);
                        LOG.debug(e);
                    }
                    progress=true;
                    _generator.sendError(e.getStatus(), e.getReason(), null, true);
                }
                finally
                {
                    some_progress|=progress;
                    //  Is this request/response round complete and are fully flushed?
                    boolean parserComplete = _parser.isComplete();
                    boolean generatorComplete = _generator.isComplete();
                    boolean complete = parserComplete && generatorComplete;
                    if (parserComplete)
                    {
                        if (generatorComplete)
                        {
                            // Reset the parser/generator
                            progress=true;

                            // look for a switched connection instance?
                            if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
                            {
                                Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
                                if (switched!=null)
                                    connection=switched;
                            }

                            reset();

                            // TODO Is this still required?
                            if (!_generator.isPersistent() && !_endp.isOutputShutdown())
                            {
                                LOG.warn("Safety net oshut!!!  IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
                                _endp.shutdownOutput();
                            }
                        }
                        else
                        {
                            // We have finished parsing, but not generating so
                            // we must not be interested in reading until we
                            // have finished generating and we reset the generator
                            _readInterested = false;
                            LOG.debug("Disabled read interest while writing response {}", _endp);
                        }
                    }

                    if (!complete && _request.getAsyncContinuation().isAsyncStarted())
                    {
                        // The request is suspended, so even though progress has been made,
                        // exit the while loop by setting progress to false
                        LOG.debug("suspended {}",this);
                        progress=false;
                    }
                }
            }
        }
        finally
        {
            setCurrentConnection(null);

            // If we are not suspended
            if (!_request.getAsyncContinuation().isAsyncStarted())
            {
                // return buffers
                _parser.returnBuffers();
                _generator.returnBuffers();

                // reenable idle checking unless request is suspended
                _asyncEndp.setCheckForIdle(true);
            }

            // Safety net to catch spinning
            if (some_progress)
                _total_no_progress=0;
            else
            {
                _total_no_progress++;
                if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE))
                    LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
                if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE)
                {
                    LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
                    if (_endp instanceof SelectChannelEndPoint)
                        ((SelectChannelEndPoint)_endp).getChannel().close();
                }
            }
        }
        return connection;
    }

4.1 http请求解析

_parser.parseAvailable()

开始进行解析http请求:

public boolean parseAvailable() throws IOException
    {
        boolean progress=parseNext()>0;

        // continue parsing
        while (!isComplete() && _buffer!=null && _buffer.length()>0 && !_contentView.hasContent())
        {
            progress |= parseNext()>0;
        }
        return progress;
    }

parseNext的解析逻辑非常的长,这里不再张贴代码,只列举几个核心步骤:

1.从channel中把http请求的数据读取到buffer中

filled=fill();

2.把请求解析

3.开始解析uri参数

_handler.startRequest(HttpMethods.CACHE.lookup(_tok0), _tok1, version=HttpVersions.CACHE.lookup(_buffer.sliceFromMark()));

4.完成http头的处理

_handler.headerComplete();

...

public void headerComplete() throws IOException
        {
            AbstractHttpConnection.this.headerComplete();
        }
        
...
handleRequest();

...
_connector.customize(_endp, _request);

...
server.handle(this);

5.

开始调用filter/servlet进行业务逻辑处理

server开始进行handler处理逻辑:

public void handle(AbstractHttpConnection connection) throws IOException, ServletException
    {
        final String target=connection.getRequest().getPathInfo();
        final Request request=connection.getRequest();
        final Response response=connection.getResponse();

        if (LOG.isDebugEnabled())
        {
            LOG.debug("REQUEST "+target+" on "+connection);
            handle(target, request, request, response);
            LOG.debug("RESPONSE "+target+"  "+connection.getResponse().getStatus()+" handled="+request.isHandled());
        }
        else
            handle(target, request, request, response);
    }
    

我们在jetty.xml中配置了2个handler,这里进行逐个调用:

public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) 
        throws IOException, ServletException
    {
        if (_handlers!=null && isStarted())
        {
            MultiException mex=null;
            
            for (int i=0;i<_handlers.length;i++)
            {
                try
                {
                    _handlers[i].handle(target,baseRequest, request, response);
                }
                catch(IOException e)
                {
                    throw e;
                }
                catch(RuntimeException e)
                {
                    throw e;
                }
                catch(Exception e)
                {
                    if (mex==null)
                        mex=new MultiException();
                    mex.add(e);
                }
            }
            if (mex!=null)
            {
                if (mex.size()==1)
                    throw new ServletException(mex.getThrowable(0));
                else
                    throw new ServletException(mex);
            }
            
        }    
    }
    
 

由于我们之前在初始化handler的时候是个责任链,所以这里会进行责任链的调用:

public final void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
    {
        if (_outerScope==null)  
            doScope(target,baseRequest,request, response);
        else 
            doHandle(target,baseRequest,request, response);
    }

调用顺序为:SessionHandler→SecurityHandler→ServletHandler

先按照这个顺序执行doScope动作,然后再执行doHandler动作

最后,我们来看下servlet如何被执行的:

public void doHandle(String target, Request baseRequest,HttpServletRequest request, HttpServletResponse response)
        throws IOException, ServletException
    {
        DispatcherType type = baseRequest.getDispatcherType();
        
        ServletHolder servlet_holder=(ServletHolder) baseRequest.getUserIdentityScope();
        FilterChain chain=null;

        // find the servlet
        if (target.startsWith("/"))
        {
            if (servlet_holder!=null && _filterMappings!=null && _filterMappings.length>0)
                chain=getFilterChain(baseRequest, target, servlet_holder);
        }
        else
        {
            if (servlet_holder!=null)
            {
                if (_filterMappings!=null && _filterMappings.length>0)
                {
                    chain=getFilterChain(baseRequest, null,servlet_holder);
                }
            }
        }

        LOG.debug("chain={}",chain);

        Throwable th=null;
        try
        {
            if (servlet_holder==null)
            {
                if (getHandler()==null)
                    notFound(request, response);
                else
                    nextHandle(target,baseRequest,request,response);
            }
            else
            {
                // unwrap any tunnelling of base Servlet request/responses
                ServletRequest req = request;
                if (req instanceof ServletRequestHttpWrapper)
                    req = ((ServletRequestHttpWrapper)req).getRequest();
                ServletResponse res = response;
                if (res instanceof ServletResponseHttpWrapper)
                    res = ((ServletResponseHttpWrapper)res).getResponse();

                // Do the filter/handling thang
                if (chain!=null)
                    chain.doFilter(req, res);
                else 
                    servlet_holder.handle(baseRequest,req,res);
            }
        }
        catch(EofException e)
        {
            throw e;
        }
        catch(RuntimeIOException e)
        {
            throw e;
        }
        catch(ContinuationThrowable e)
        {   
            throw e;
        }
        catch(Exception e)
        {
            if (!(DispatcherType.REQUEST.equals(type) || DispatcherType.ASYNC.equals(type)))
            {
                if (e instanceof IOException)
                    throw (IOException)e;
                if (e instanceof RuntimeException)
                    throw (RuntimeException)e;
                if (e instanceof ServletException)
                    throw (ServletException)e;
            }

            // unwrap cause
            th=e;
            if (th instanceof UnavailableException)
            {
                LOG.debug(th); 
            }
            else if (th instanceof ServletException)
            {
                LOG.warn(th);
                Throwable cause=((ServletException)th).getRootCause();
                if (cause!=null)
                    th=cause;
            }

            // handle or log exception
            if (th instanceof HttpException)
                throw (HttpException)th;
            else if (th instanceof RuntimeIOException)
                throw (RuntimeIOException)th;
            else if (th instanceof EofException)
                throw (EofException)th;

            else if (LOG.isDebugEnabled())
            {
                LOG.warn(request.getRequestURI(), th); 
                LOG.debug(request.toString()); 
            }
            else if (th instanceof IOException || th instanceof UnavailableException)
            {
                LOG.debug(request.getRequestURI(),th);
            }
            else
            {
                LOG.warn(request.getRequestURI(),th);
            }

            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE,th.getClass());
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION,th);
            if (!response.isCommitted())
            {
                if (th instanceof UnavailableException)
                {
                    UnavailableException ue = (UnavailableException)th;
                    if (ue.isPermanent())
                        response.sendError(HttpServletResponse.SC_NOT_FOUND);
                    else
                        response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                }
                else
                    response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            }
            else
                LOG.debug("Response already committed for handling "+th);
            
        }
        catch(Error e)
        {   
            if (!(DispatcherType.REQUEST.equals(type) || DispatcherType.ASYNC.equals(type)))
                throw e;
            th=e;
            LOG.warn("Error for "+request.getRequestURI(),e);
            if(LOG.isDebugEnabled())LOG.debug(request.toString());

            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE,e.getClass());
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION,e);
            if (!response.isCommitted())
                response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            else
                LOG.debug("Response already committed for handling ",e);
        }
        finally
        {
            if (servlet_holder!=null)
                baseRequest.setHandled(true);

            // Complete async requests 
            if (th!=null && request.isAsyncStarted())
                ((AsyncContinuation)request.getAsyncContext()).errorComplete();
        }
    }
 

上面代码最关键的就是chain.doFilter(req, res)开始整个servlet链条的调用。

6.完成response并执行输出

在httpParser执行完startRequest之后,执行_response.complete()

 public void complete() throws IOException
    {
        if (_state == STATE_END)
            return;

        super.complete();

        if (_state < STATE_FLUSHING)
        {
            _state = STATE_FLUSHING;
            if (_contentLength == HttpTokens.CHUNKED_CONTENT)
                _needEOC = true;
        }

        flushBuffer();
    }

最终调用HttpGenerator的complete方法执行输出。

这里具体的输出代码很繁琐和冗长,大家有兴趣可以自行研究。

我们来看下整个请求的时序图:

7.扫尾

最后会进行一些buffer的清理,状态的reset等工作,以及selector线程进行超时连接的处理等等。

about_jetty_6.txt · 最后更改: 2018/10/14 15:31 (外部编辑)