用户工具

站点工具


about_jetty_7

Jetty的那些事儿 第七章:Jetty核心数据结构分析

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_jetty_7

1.BlockingArrayQueue

在前面的章节中我们知道,Jetty在接受请求之后,把相应的事件提交到了工作队列中,默认的队列实现就是:BlockingArrayQueue Jetty本来想在这里做一下false_sharing的处理,不过9.X的版本又去除了,也许作者觉得优化的过细没有什么意义吧,我会另外开辟博文说明这个问题。 这个Queue是个循环队列,包含head和tail两个指针,所以在使用上效率会优与jdk的ArrayBlockingQueue.不过使用这个队列要比较小心,因为假如队列满的时候,它会进行自动增长扩容,如果请求过于密集也许队列会被撑爆。我们来看下源码:

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.util;

import java.util.AbstractList;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


/* ------------------------------------------------------------ */
/** Queue backed by a circular array.
 * 
 * This queue is uses  a variant of the two lock queue algorithm to
 * provide an efficient queue or list backed by a growable circular
 * array.  This queue also has a partial implementation of 
 * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
 * {@link #poll(long, TimeUnit)} methods.  
 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
 * able to grow and provides a blocking put call.
 * <p>
 * The queue has both a capacity (the size of the array currently allocated)
 * and a limit (the maximum size that may be allocated), which defaults to 
 * {@link Integer#MAX_VALUE}.
 * 
 * @param <E> The element type
 */
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{
    public final int DEFAULT_CAPACITY=128;
    public final int DEFAULT_GROWTH=64;
    private final int _limit;
    private final AtomicInteger _size=new AtomicInteger();
    private final int _growCapacity;
    
    private volatile int _capacity;
    private Object[] _elements;
    
    private final ReentrantLock _headLock = new ReentrantLock();
    private final Condition _notEmpty = _headLock.newCondition();
    private int _head;

    // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
    // TODO verify this has benefits
    private long _space0;
    private long _space1;
    private long _space2;
    private long _space3;
    private long _space4;
    private long _space5;
    private long _space6;
    private long _space7;
    
    private final ReentrantLock _tailLock = new ReentrantLock();
    private int _tail;
    

    /* ------------------------------------------------------------ */
    /** Create a growing partially blocking Queue
     * 
     */
    public BlockingArrayQueue()
    {
        _elements=new Object[DEFAULT_CAPACITY];
        _growCapacity=DEFAULT_GROWTH;
        _capacity=_elements.length;
        _limit=Integer.MAX_VALUE;
    }

    /* ------------------------------------------------------------ */
    /** Create a fixed size partially blocking Queue
     * @param limit The initial capacity and the limit.
     */
    public BlockingArrayQueue(int limit)
    {
        _elements=new Object[limit];
        _capacity=_elements.length;
        _growCapacity=-1;
        _limit=limit;
    }

    /* ------------------------------------------------------------ */
    /** Create a growing partially blocking Queue.
     * @param capacity Initial capacity
     * @param growBy Incremental capacity.
     */
    public BlockingArrayQueue(int capacity,int growBy)
    {
        _elements=new Object[capacity];
        _capacity=_elements.length;
        _growCapacity=growBy;
        _limit=Integer.MAX_VALUE;
    }

    /* ------------------------------------------------------------ */
    /** Create a growing limited partially blocking Queue.
     * @param capacity Initial capacity
     * @param growBy Incremental capacity.
     * @param limit maximum capacity.
     */
    public BlockingArrayQueue(int capacity,int growBy,int limit)
    {
        if (capacity>limit)
            throw new IllegalArgumentException();
        
        _elements=new Object[capacity];
        _capacity=_elements.length;
        _growCapacity=growBy;
        _limit=limit;
    }

    /* ------------------------------------------------------------ */
    public int getCapacity()
    {
        return _capacity;
    }

    /* ------------------------------------------------------------ */
    public int getLimit()
    {
        return _limit;
    }
    
    /* ------------------------------------------------------------ */
    @Override
    public boolean add(E e)
    {
        return offer(e);
    }
    
    /* ------------------------------------------------------------ */
    public E element()
    {
        E e = peek();
        if (e==null)
            throw new NoSuchElementException();
        return e;
    }
    
    /* ------------------------------------------------------------ */
    @SuppressWarnings("unchecked")
    public E peek()
    {
        if (_size.get() == 0)
            return null;
        
        E e = null;
        _headLock.lock(); // Size cannot shrink
        try 
        {
            if (_size.get() > 0) 
                e = (E)_elements[_head];
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    public boolean offer(E e)
    {
        if (e == null) 
            throw new NullPointerException();
        
        boolean not_empty=false;
        _tailLock.lock();  // size cannot grow... only shrink
        try 
        {
            if (_size.get() >= _limit) 
                return false;
            
            // should we expand array?
            if (_size.get()==_capacity)
            {
                _headLock.lock();   // Need to grow array
                try
                {
                    if (!grow())
                        return false;
                }
                finally
                {
                    _headLock.unlock();
                }
            }

            // add the element
            _elements[_tail]=e;
            _tail=(_tail+1)%_capacity;

            not_empty=0==_size.getAndIncrement();
            
        } 
        finally 
        {
            _tailLock.unlock();
        }
        
        if (not_empty)
        {
            _headLock.lock();
            try
            {
                _notEmpty.signal();
            }
            finally
            {
                _headLock.unlock();
            }
        }  

        return true;
    }


    /* ------------------------------------------------------------ */
    @SuppressWarnings("unchecked")
    public E poll()
    {
        if (_size.get() == 0)
            return null;
        
        E e = null;
        _headLock.lock(); // Size cannot shrink
        try 
        {
            if (_size.get() > 0) 
            {
                final int head=_head;
                e = (E)_elements[head];
                _elements[head]=null;
                _head=(head+1)%_capacity;
                
                if (_size.decrementAndGet()>0)
                    _notEmpty.signal();
            }
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    /**
     * Retrieves and removes the head of this queue, waiting
     * if no elements are present on this queue.
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting.
     */
    @SuppressWarnings("unchecked")
    public E take() throws InterruptedException
    {
        E e = null;
        _headLock.lockInterruptibly();  // Size cannot shrink
        try 
        {
            try 
            {
                while (_size.get() == 0)
                {
                    _notEmpty.await();
                }
            } 
            catch (InterruptedException ie) 
            {
                _notEmpty.signal();
                throw ie;
            }

            final int head=_head;
            e = (E)_elements[head];
            _elements[head]=null;
            _head=(head+1)%_capacity;

            if (_size.decrementAndGet()>0)
                _notEmpty.signal();
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    /**
     * Retrieves and removes the head of this queue, waiting
     * if necessary up to the specified wait time if no elements are
     * present on this queue.
     * @param time how long to wait before giving up, in units of
     * <tt>unit</tt>
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
     * <tt>timeout</tt> parameter
     * @return the head of this queue, or <tt>null</tt> if the
     * specified waiting time elapses before an element is present.
     * @throws InterruptedException if interrupted while waiting.
     */
    @SuppressWarnings("unchecked")
    public E poll(long time, TimeUnit unit) throws InterruptedException
    {
        
        E e = null;

        long nanos = unit.toNanos(time);
        
        _headLock.lockInterruptibly(); // Size cannot shrink
        try 
        {    
            try 
            {
                while (_size.get() == 0)
                {
                    if (nanos<=0)
                        return null;
                    nanos = _notEmpty.awaitNanos(nanos);
                }
            } 
            catch (InterruptedException ie) 
            {
                _notEmpty.signal();
                throw ie;
            }

            e = (E)_elements[_head];
            _elements[_head]=null;
            _head=(_head+1)%_capacity;

            if (_size.decrementAndGet()>0)
                _notEmpty.signal();
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    public E remove()
    {
        E e=poll();
        if (e==null)
            throw new NoSuchElementException();
        return e;
    }

    /* ------------------------------------------------------------ */
    @Override
    public void clear()
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                _head=0;
                _tail=0;
                _size.set(0);
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /* ------------------------------------------------------------ */
    @Override
    public boolean isEmpty()
    {
        return _size.get()==0;
    }

    /* ------------------------------------------------------------ */
    @Override
    public int size()
    {
        return _size.get();
    }

    /* ------------------------------------------------------------ */
    @SuppressWarnings("unchecked")
    @Override
    public E get(int index)
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                if (index<0 || index>=_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
                int i = _head+index;
                if (i>=_capacity)
                    i-=_capacity;
                return (E)_elements[i];
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }
    
    /* ------------------------------------------------------------ */
    @Override
    public E remove(int index)
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {

                if (index<0 || index>=_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");

                int i = _head+index;
                if (i>=_capacity)
                    i-=_capacity;
                @SuppressWarnings("unchecked")
                E old=(E)_elements[i];

                if (i<_tail)
                {
                    System.arraycopy(_elements,i+1,_elements,i,_tail-i);
                    _tail--;
                    _size.decrementAndGet();
                }
                else
                {
                    System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
                    if (_tail>0)
                    {
                        _elements[_capacity]=_elements[0];
                        System.arraycopy(_elements,1,_elements,0,_tail-1);
                        _tail--;
                    }
                    else
                        _tail=_capacity-1;

                    _size.decrementAndGet();
                }

                return old;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /* ------------------------------------------------------------ */
    @Override
    public E set(int index, E e)
    {
        if (e == null) 
            throw new NullPointerException();

        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {

                if (index<0 || index>=_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");

                int i = _head+index;
                if (i>=_capacity)
                    i-=_capacity;
                @SuppressWarnings("unchecked")
                E old=(E)_elements[i];
                _elements[i]=e;
                return old;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }
    
    /* ------------------------------------------------------------ */
    @Override
    public void add(int index, E e)
    {
        if (e == null) 
            throw new NullPointerException();

        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {

                if (index<0 || index>_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");

                if (index==_size.get())
                {
                    add(e);
                }
                else
                {
                    if (_tail==_head)
                        if (!grow())
                            throw new IllegalStateException("full");

                    int i = _head+index;
                    if (i>=_capacity)
                        i-=_capacity;

                    _size.incrementAndGet();
                    _tail=(_tail+1)%_capacity;


                    if (i<_tail)
                    {
                        System.arraycopy(_elements,i,_elements,i+1,_tail-i);
                        _elements[i]=e;
                    }
                    else
                    {
                        if (_tail>0)
                        {
                            System.arraycopy(_elements,0,_elements,1,_tail);
                            _elements[0]=_elements[_capacity-1];
                        }

                        System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
                        _elements[i]=e;
                    }
                }
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /* ------------------------------------------------------------ */
    private boolean grow()
    {
        if (_growCapacity<=0)
            return false;

        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                final int head=_head;
                final int tail=_tail;
                final int new_tail;

                Object[] elements=new Object[_capacity+_growCapacity];

                if (head<tail)
                {
                    new_tail=tail-head;
                    System.arraycopy(_elements,head,elements,0,new_tail);
                }
                else if (head>tail || _size.get()>0)
                {
                    new_tail=_capacity+tail-head;
                    int cut=_capacity-head;
                    System.arraycopy(_elements,head,elements,0,cut);
                    System.arraycopy(_elements,0,elements,cut,tail);
                }
                else
                {
                    new_tail=0;
                }

                _elements=elements;
                _capacity=_elements.length;
                _head=0;
                _tail=new_tail; 
                return true;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }

    }

    /* ------------------------------------------------------------ */
    public int drainTo(Collection<? super E> c)
    {
        throw new UnsupportedOperationException();
    }

    /* ------------------------------------------------------------ */
    public int drainTo(Collection<? super E> c, int maxElements)
    {
        throw new UnsupportedOperationException();
    }

    /* ------------------------------------------------------------ */
    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
    {
        throw new UnsupportedOperationException();
    }

    /* ------------------------------------------------------------ */
    public void put(E o) throws InterruptedException
    {
        if (!add(o))
            throw new IllegalStateException("full");
    }

    /* ------------------------------------------------------------ */
    public int remainingCapacity()
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                return getCapacity()-size();
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }
    

    /* ------------------------------------------------------------ */
    long sumOfSpace()
    {
        // this method exists to stop clever optimisers removing the spacers
        return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; 
    }
}

2. LazyList

这真是个无聊的结构,看下源码,有意思不?

 public static Object add(Object list, Object item)
    {
        if (list==null)
        {
            if (item instanceof List || item==null)
            {
                List<Object> l = new ArrayList<Object>();
                l.add(item);
                return l;
            }

            return item;
        }

        if (list instanceof List)
        {
            ((List<Object>)list).add(item);
            return list;
        }

        List<Object> l=new ArrayList<Object>();
        l.add(list);
        l.add(item);
        return l;    
    }

3. Buffer

Buffer是jetty数据结构的重头,对IO的读写都离不开Buffer.我们来仔细研究下:

首先我们来看下Buffer的类图:

然后我们针对buffer的接口做相应注释:

public interface Buffer extends Cloneable {
    // 基于Buffer主要用于向当前连接读写数据,因而它定义了两个核心的方法:readFrom和writeTo。
    //其中readFrom方法从InputStream中读取字节数据,writeTo方法将响应字节写入OutputStream中,
    //即向Connection中读取和写入数据,写完后清理当前Buffer。
    int readFrom(InputStream in, int max) throws IOException;
    void writeTo(OutputStream out) throws IOException;


    // 下面的方法类似NIO中Buffer的设计:每次get操作都从getIndex开始,
    //并且getIndex会向前移动读取的字节数的长度;每次的peek操作也getIndex开始,但是peek操作不会使getIndex向前移动;
    //每次put操作都从putIndex开始,并且putIndex会向前移动写入的字节数的长度;每次poke操作也会从putIndex开始,
    //但是poke操作不会使putIndex向前移动;mark操作会以getIndex作为基准设置markIndex的值,
    //从而在reset时会将getIndex重置到之前mark过的位置;capacity表示该Buffer最多可存储的字节数,
    //而length表示从getIndex到putIndex还存在的字节数。并且Buffer永远保证以下关系总是成立:
    //markIndex<=getIndex<=putIndex<=capacity

    byte get();
    int get(byte[] b, int offset, int length);
    Buffer get(int length);
    int getIndex();

    void mark();
    void mark(int offset);
    int markIndex();

    byte peek();
    byte peek(int index);
    Buffer peek(int index, int length);
    int peek(int index, byte[] b, int offset, int length);

    int poke(int index, Buffer src);
    void poke(int index, byte b);
    int poke(int index, byte b[], int offset, int length);

    int put(Buffer src);
    void put(byte b);
    int put(byte[] b,int offset, int length);
    int put(byte[] b);
    int putIndex();

    int length();
    void clear();
    void reset();
    void setGetIndex(int newStart);
    void setMarkIndex(int newMark);
    void setPutIndex(int newLimit);

    /** 一个Buffer还有独立的两种状态:access级别和volatile。
access级别有:IMMUTABLE,表示当前Buffer所有的Index和内容都不能被改变;READONLY,表示当前Buffer是只读的,
即getIndex和markIndex可以被改变,而putIndex以及Buffer内容不可以;READWRITE,表示所有的Index以及Buffer的内容可以被改变。
volatile状态表示当前Buffer是否会通过其他路径被修改,默认情况下,ByteArrayBuffer、DirectNIOBuffer等是NON_VOLATILE状态,
而View是VOLATILE状态(除非View内部的Buffer是IMMUTABLE)。
**/
    // 返回包含当前Buffer从getIndex到putIndex内容的Buffer,并且返回的Buffer不可以被其他路径修改。
    //如果当前Buffer是NON_VOLATILE,则直接返回当前Buffer(这个实现是不严谨的,因为在这种情况下,
    //其实有两个Buffer实例可以修改同一份数据),
    //否则,克隆一个新的Buffer,这样对新的Buffer的修改不会影响原Buffer的内容。
    Buffer asNonVolatileBuffer();
    // 返回一个只读的Buffer(View),该只读的Buffer的读取不会影响原来Buffer的Index。
    Buffer asReadOnlyBuffer();
    // 拷贝一个不可修改的ByteBuffer。
    Buffer asImmutableBuffer();
    // 返回一个可修改的Buffer,并且对返回的Buffer的内容修改会影响原有的Buffer。
    Buffer asMutableBuffer();

    // 当前Buffer是否不可被修改,即Buffer内容和所有Index都不能被修改。
    boolean isImmutable();
    // 当前Buffer是否是只读的。
    boolean isReadOnly();
    // 是否当前Buffer内容可以通过其他路径被修改,
    //比如View一般情况下是VOLATILE状态(除非View内部的Buffer是IMMUTABLE)。
   boolean isVolatile();
    
    // 除了以上的操作,Buffer还提供了一些其他用于操作Buffer内部字节的方法:

    //如果内部使用字节数组表示,返回该字节数组,否则,返回null。
    byte[] array();
    // 获取从getIndex到putIndex的字节数组,其长度为length。
    byte[] asArray();
    // 如果当前Buffer是对另一个Buffer的包装,则返回内部被包装的Buffer实例,否则返回当前Buffer本身。
    Buffer buffer();

    int capacity();
    // 返回当前Buffer剩余的空间,即capacity-putIndex。
    int space();
    // 清除Buffer内容,即设置getIndex和putIndex为0,以及markIndex为-1。
    // 整理Buffer内容,即将markIndex >= 0 ? min(getIndex, markIndex) : 
    //getIndex到putIndex的内容移动到Buffer的起始位置,同时修改相应的getIndex、markIndex、putIndex。
    void compact();
    // 当前Buffer是否有可用字节,即是否putIndex>getIndex。
    boolean hasContent();

    // 跳过n个字节,即getIndex增加min(remaining(), n)的值。
    int skip(int n);

    // 切割出当前Buffer从getIndex到putIndex的View,一般来说它是volatile的(除非它是immutable类型的Buffer)。
    Buffer slice();
    // 切割出当前Buffer从markIndex到putIndex的View,一般来说它是volatile的(除非它是immutable类型的Buffer)。
    Buffer sliceFromMark();
    Buffer sliceFromMark(int length);

    // 返回包含当前Buffer状态和内容的字符串。
    String toDetailString();
    boolean equalsIgnoreCase(Buffer buffer);
 }

4.ThreadPool

ThreadPool在Jetty中扮演了acceptor线程,selector线程和handler线程的运行处理工作,dispatch将请求分发到线程池:

线程池初始化也非常简单,无非就是初始化了几个线程:

private boolean startThread(int threads)
    {
        final int next=threads+1;
        if (!_threadsStarted.compareAndSet(threads,next))
            return false;

        boolean started=false;
        try
        {
            Thread thread=newThread(_runnable);
            thread.setDaemon(_daemon);
            thread.setPriority(_priority);
            thread.setName(_name+"-"+thread.getId());
            _threads.add(thread);

            thread.start();
            started=true;
        }
        finally
        {
            if (!started)
                _threadsStarted.decrementAndGet();
        }
        return started;
    }
  

然后runnable逻辑无非就是从队列中取逻辑并执行:

 private Runnable _runnable = new Runnable()
    {
        public void run()
        {
            boolean shrink=false;
            try
            {
                Runnable job=_jobs.poll();
                while (isRunning())
                {
                    // Job loop
                    while (job!=null && isRunning())
                    {
                        runJob(job);
                        job=_jobs.poll();
                    }

                    // Idle loop
                    try
                    {
                        _threadsIdle.incrementAndGet();

                        while (isRunning() && job==null)
                        {
                            if (_maxIdleTimeMs<=0)
                                job=_jobs.take();
                            else
                            {
                                // maybe we should shrink?
                                final int size=_threadsStarted.get();
                                if (size>_minThreads)
                                {
                                    long last=_lastShrink.get();
                                    long now=System.currentTimeMillis();
                                    if (last==0 || (now-last)>_maxIdleTimeMs)
                                    {
                                        shrink=_lastShrink.compareAndSet(last,now) &&
                                        _threadsStarted.compareAndSet(size,size-1);
                                        if (shrink)
                                            return;
                                    }
                                }
                                job=idleJobPoll();
                            }
                        }
                    }
                    finally
                    {
                        _threadsIdle.decrementAndGet();
                    }
                }
            }
            catch(InterruptedException e)
            {
                LOG.ignore(e);
            }
            catch(Exception e)
            {
                LOG.warn(e);
            }
            finally
            {
                if (!shrink)
                    _threadsStarted.decrementAndGet();
                _threads.remove(Thread.currentThread());
            }
        }
    };
about_jetty_7.txt · 最后更改: 2018/10/14 15:31 (外部编辑)