用户工具

站点工具


about_jetty_5

Jetty的那些事儿 第五章:服务器IO模型

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_jetty_5

前几章我们分别讲述了Jetty的启动过程以及Webapp的装载,在这些事情做完之后,Jetty就已经完全启动了,那么就可以接受http请求进行处理了吗?还没呢,服务器的网络IO模块都没初始化,怎么处理请求呢?那么我们继续往下看Jetty的IO模型是什么样子的。

我们先来看下Jetty服务器模型的架构图,然后再来分析源码,这之前,大家先学习下java nio的基础知识,这里不再做详细解释。【这里是参考资料】

io程序的启动逻辑在server的doStart()方法中:

if (_connectors!=null && mex.size()==0)
        {
            for (int i=0;i<_connectors.length;i++)
            {
                try{_connectors[i].start();}
                catch(Throwable e)
                {
                    mex.add(e);
                }
            }
        }

1.启动SelectChannelConnector

这里的connectors就是我们在jetty.xml中配置的:

<Call name="addConnector">
 37       <Arg>
 38           <New class="org.eclipse.jetty.server.nio.SelectChannelConnector">
 39             <Set name="host"><Property name="jetty.host" /></Set>
 40             <Set name="port"><Property name="jetty.port" default="8080"/></Set>
 41             <Set name="maxIdleTime">300000</Set>
 42             <Set name="Acceptors">2</Set>
 43             <Set name="statsOn">false</Set>
 44             <Set name="confidentialPort">8443</Set>
 45         <Set name="lowResourcesConnections">20000</Set>
 46         <Set name="lowResourcesMaxIdleTime">5000</Set>
 47           </New>
 48       </Arg>
 49     </Call>

2.初始化selectorManager

然后我们来看SelectChannelConnnector的初始化:

protected void doStart() throws Exception
    {
        _manager.setSelectSets(getAcceptors());
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());

        super.doStart();
    }

上面代码先初始化了selectorManager的参数,然后其父类进行了核心的初始化工作:

protected void doStart() throws Exception
    {
        if (_server == null)
            throw new IllegalStateException("No server");

        // open listener port
        open();

        if (_threadPool == null)
        {
            _threadPool = _server.getThreadPool();
            addBean(_threadPool,false);
        }

        super.doStart();

        // Start selector thread
        synchronized (this)
        {
            _acceptorThreads = new Thread[getAcceptors()];

            for (int i = 0; i < _acceptorThreads.length; i++)
                if (!_threadPool.dispatch(new Acceptor(i)))
                    throw new IllegalStateException("!accepting");
            if (_threadPool.isLowOnThreads())
                LOG.warn("insufficient threads configured for {}",this);
        }

        LOG.info("Started {}",this);
    }
<code>

====3.初始化acceptChannel====

首先open方法初始化了服务器的acceptChannel:

<code>
public void open() throws IOException
    {
        synchronized(this)
        {
            if (_acceptChannel == null)
            {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();
                // Set to blocking mode
                _acceptChannel.configureBlocking(true);

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                _localPort=_acceptChannel.socket().getLocalPort();
                if (_localPort<=0)
                    throw new IOException("Server channel not bound");

                addBean(_acceptChannel);
            }
        }
    }

4.初始化buffer

接着又初始化了request和response的缓冲区:

protected void doStart()
        throws Exception
    {
        _requestBuffers=BuffersFactory.newBuffers(_requestHeaderType,_requestHeaderSize,_requestBufferType,_requestBufferSize,_requestBufferType,getMaxBuffers());
        _responseBuffers=BuffersFactory.newBuffers(_responseHeaderType,_responseHeaderSize,_responseBufferType,_responseBufferSize,_responseBufferType,getMaxBuffers());
        super.doStart();
    }

5.初始化selector数组

然后selectorManager又初始化了selector数组:

SelectSet(int acceptorID) throws Exception
        {
            _setID=acceptorID;

            _idleTick = System.currentTimeMillis();
            _timeout = new Timeout(this);
            _timeout.setDuration(0L);

            // create a selector;
            _selector = Selector.open();
            _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
        }

并且将selector线程提交到线程池:

protected void doStart() throws Exception
    {
        _selectSet = new SelectSet[_selectSets];
        for (int i=0;i<_selectSet.length;i++)
            _selectSet[i]= new SelectSet(i);

        super.doStart();

        // start a thread to Select
        for (int i=0;i<getSelectSets();i++)
        {
            final int id=i;
            boolean selecting=dispatch(new Runnable()
            {
                public void run()
                {
                    String name=Thread.currentThread().getName();
                    int priority=Thread.currentThread().getPriority();
                    try
                    {
                        SelectSet[] sets=_selectSet;
                        if (sets==null)
                            return;
                        SelectSet set=sets[id];

                        Thread.currentThread().setName(name+" Selector"+id);
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
                        LOG.debug("Starting {} on {}",Thread.currentThread(),this);
                        while (isRunning())
                        {
                            try
                            {
                                set.doSelect();
                            }
                            catch(IOException e)
                            {
                                LOG.ignore(e);
                            }
                            catch(Exception e)
                            {
                                LOG.warn(e);
                            }
                        }
                    }
                    finally
                    {
                        LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
                        Thread.currentThread().setName(name);
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(priority);
                    }
                }

            });

            if (!selecting)
                throw new IllegalStateException("!Selecting");
        }
    }

6.初始化accept线程组

最后启动了几个accept线程来接受请求,注意:acceptor的数量和selector的数量一样。

// Start selector thread
        synchronized (this)
        {
            _acceptorThreads = new Thread[getAcceptors()];

            for (int i = 0; i < _acceptorThreads.length; i++)
                if (!_threadPool.dispatch(new Acceptor(i)))
                    throw new IllegalStateException("!accepting");
            if (_threadPool.isLowOnThreads())
                LOG.warn("insufficient threads configured for {}",this);
        }

        LOG.info("Started {}",this);
    }



public void run()
        {
            Thread current = Thread.currentThread();
            String name;
            synchronized (AbstractConnector.this)
            {
                if (_acceptorThreads == null)
                    return;

                _acceptorThreads[_acceptor] = current;
                name = _acceptorThreads[_acceptor].getName();
                current.setName(name + " Acceptor" + _acceptor + " " + AbstractConnector.this);
            }
            int old_priority = current.getPriority();

            try
            {
                current.setPriority(old_priority - _acceptorPriorityOffset);
                while (isRunning() && getConnection() != null)
                {
                    try
                    {
                        accept(_acceptor);
                    }
                    catch (EofException e)
                    {
                        LOG.ignore(e);
                    }
                    catch (IOException e)
                    {
                        LOG.ignore(e);
                    }
                    catch (InterruptedException x)
                    {
                        // Connector has been stopped
                        LOG.ignore(x);
                    }
                    catch (Throwable e)
                    {
                        LOG.warn(e);
                    }
                }
            }
            finally
            {
                current.setPriority(old_priority);
                current.setName(name);

                synchronized (AbstractConnector.this)
                {
                    if (_acceptorThreads != null)
                        _acceptorThreads[_acceptor] = null;
                }
            }
        }
    }



public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel server;
        synchronized(this)
        {
            server = _acceptChannel;
        }

        if (server!=null && server.isOpen() && _manager.isStarted())
        {
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Socket socket = channel.socket();
            configure(socket);
            _manager.register(channel);
        }
    }

我们再来看_manager.register方法:

public void register(SocketChannel channel)
    {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++;
        if (s<0)
            s=-s;
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup();
        }
    }

可以看到假如有新的请求进来,将会被提交到SelectSet的change队列中,最后我们再看下SelectSet的doSelect方法的片段:

else if (change instanceof SocketChannel)
                        {
                            // Newly registered channel
                            final SocketChannel channel=(SocketChannel)change;
                            ch=channel;
                            key = channel.register(selector,SelectionKey.OP_READ,null);
                            SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                            key.attach(endpoint);
                            endpoint.schedule();
                        }
                        

后面selector线程将会对注册的read事件进行处理,对该channel提交handler线程到线程池进行读取的操作了。至于doSelector的具体流程,请看后面的请求处理相关文章。

about_jetty_5.txt · 最后更改: 2018/10/14 15:31 (外部编辑)