用户工具

站点工具


about_jetty_8

Jetty的那些事儿 第八章:Jetty的comet技术

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_jetty_8

我知道假如不使用comet的长连接技术,那么客户端将会轮询服务器获取数据,这样服务器压力很大,Jetty通过提供一个Continuation组件来实现对servlet/filter请求的反转控制,避免每次请求对服务器的轮询,在客户端在服务器建立连接之后,服务器可以反向推送数据给客户端。

1.使用方法

实现Continuation的编程方法需要几个步骤:

a.获取Continuation实例:

Continuation continuation = ContinuationSupport.getContinuation(request);

b.挂起请求的线程:

void doGet(HttpServletRequest request, HttpServletResponse response)
{
    ...
    // optionally:
    // continuation.setTimeout(long);
    continuation.suspend();
    ...
}

c.恢复挂起的线程

continuation.setAttribute("results",results);
continuation.resume();

d.注册回调事件,我们还可以注册回调,在onTimeout,onComplete等函数被执行后调用。

void doGet(HttpServletRequest request, HttpServletResponse response)
{
    ...
 
    Continuation continuation = ContinuationSupport.getContinuation(request);
    continuation.addContinuationListener(new ContinuationListener()
    {
      public void onTimeout(Continuation continuation) { ... }
      public void onComplete(Continuation continuation) { ... }
    });
 
    continuation.suspend();
    ...
}

2.聊天室实例

接下来我们可以看一个聊天室实例:

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package com.acme;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.util.HashMap;
24  import java.util.LinkedList;
25  import java.util.Map;
26  import java.util.Queue;
27  
28  import javax.servlet.ServletException;
29  import javax.servlet.http.HttpServlet;
30  import javax.servlet.http.HttpServletRequest;
31  import javax.servlet.http.HttpServletResponse;
32  
33  import org.eclipse.jetty.continuation.Continuation;
34  import org.eclipse.jetty.continuation.ContinuationSupport;
35  
36  
37  // Simple asynchronous Chat room.
38  // This does not handle duplicate usernames or multiple frames/tabs from the same browser
39  // Some code is duplicated for clarity.
40  public class ChatServlet extends HttpServlet
41  {
42      
43      // inner class to hold message queue for each chat room member
44      class Member
45      {
46          String _name;
47          Continuation _continuation;
48          Queue<String> _queue = new LinkedList<String>();
49      }
50  
51      Map<String,Map<String,Member>> _rooms = new HashMap<String,Map<String, Member>>();
52      
53      
54      // Handle Ajax calls from browser
55      @Override
56      protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
57      {   
58          // Ajax calls are form encoded
59          String action = request.getParameter("action");
60          String message = request.getParameter("message");
61          String username = request.getParameter("user");
62  
63          if (action.equals("join"))
64              join(request,response,username);
65          else if (action.equals("poll"))
66              poll(request,response,username);
67          else if (action.equals("chat"))
68              chat(request,response,username,message);
69      }
70  
71      private synchronized void join(HttpServletRequest request,HttpServletResponse response,String username)
72      throws IOException
73      {
74          Member member = new Member();
75          member._name=username;
76          Map<String,Member> room=_rooms.get(request.getPathInfo());
77          if (room==null)
78          {
79              room=new HashMap<String,Member>();
80              _rooms.put(request.getPathInfo(),room);
81          }
82          room.put(username,member); 
83          response.setContentType("text/json;charset=utf-8");
84          PrintWriter out=response.getWriter();
85          out.print("{action:\"join\"}");
86      }
87  
88      private synchronized void poll(HttpServletRequest request,HttpServletResponse response,String username)
89      throws IOException
90      {
91          Map<String,Member> room=_rooms.get(request.getPathInfo());
92          if (room==null)
93          {
94              response.sendError(503);
95              return;
96          }
97          Member member = room.get(username);
98          if (member==null)
99          {
100             response.sendError(503);
101             return;
102         }
103 
104         synchronized(member)
105         {
106             if (member._queue.size()>0)
107             {
108                 // Send one chat message
109                 response.setContentType("text/json;charset=utf-8");
110                 StringBuilder buf=new StringBuilder();
111 
112                 buf.append("{\"action\":\"poll\",");
113                 buf.append("\"from\":\"");
114                 buf.append(member._queue.poll());
115                 buf.append("\",");
116 
117                 String message = member._queue.poll();
118                 int quote=message.indexOf('"');
119                 while (quote>=0)
120                 {
121                     message=message.substring(0,quote)+'\\'+message.substring(quote);
122                     quote=message.indexOf('"',quote+2);
123                 }
124                 buf.append("\"chat\":\"");
125                 buf.append(message);
126                 buf.append("\"}");
127                 byte[] bytes = buf.toString().getBytes("utf-8");
128                 response.setContentLength(bytes.length);
129                 response.getOutputStream().write(bytes);
130             }
131             else 
132             {
133                 Continuation continuation = ContinuationSupport.getContinuation(request);
134                 if (continuation.isInitial()) 
135                 {
136                     // No chat in queue, so suspend and wait for timeout or chat
137                     continuation.setTimeout(20000);
138                     continuation.suspend();
139                     member._continuation=continuation;
140                 }
141                 else
142                 {
143                     // Timeout so send empty response
144                     response.setContentType("text/json;charset=utf-8");
145                     PrintWriter out=response.getWriter();
146                     out.print("{action:\"poll\"}");
147                 }
148             }
149         }
150     }
151 
152     private synchronized void chat(HttpServletRequest request,HttpServletResponse response,String username,String message)
153     throws IOException
154     {
155         Map<String,Member> room=_rooms.get(request.getPathInfo());
156         if (room!=null)
157         {
158             // Post chat to all members
159             for (Member m:room.values())
160             {
161                 synchronized (m)
162                 {
163                     m._queue.add(username); // from
164                     m._queue.add(message);  // chat
165 
166                     // wakeup member if polling
167                     if (m._continuation!=null)
168                     {
169                         m._continuation.resume();
170                         m._continuation=null;
171                     }
172                 }
173             }
174         }
175 
176         response.setContentType("text/json;charset=utf-8");
177         PrintWriter out=response.getWriter();
178         out.print("{action:\"chat\"}");  
179     }
180     
181     // Serve the HTML with embedded CSS and Javascript.
182     // This should be static content and should use real JS libraries.
183     @Override
184     protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
185     {
186         if (request.getParameter("action")!=null)
187             doPost(request,response);
188         else
189             getServletContext().getNamedDispatcher("default").forward(request,response);
190     }
191     
192 }

上面的这个例子通过continuation来实现了一个服务器反向推送客户端的聊天室实例。

3. 源码分析

首先我们来看下Continuation接口:

public interface Continuation
{
    public final static String ATTRIBUTE = "org.eclipse.jetty.continuation";

    //设置挂起的等待时间,如果时间<=0那么将会永远等待
    void setTimeout(long timeoutMs);

   //挂起当前线程
    void suspend();
    
    
    void suspend(ServletResponse response);

   //恢复挂起的线程
    void resume();

    //完成一个挂起的请求,该方法调用会,调用后suspend挂起的线程将会被唤醒
    void complete();

    /* ------------------------------------------------------------ */
    /**
     * @return true after {@link #suspend()} has been called and before the
     *         request has been redispatched due to being resumed, completed or
     *         timed out.
     */
    boolean isSuspended();

    /* ------------------------------------------------------------ */
    /**
     * @return true if the request has been redispatched by a call to
     *         {@link #resume()}. Returns false after any subsequent call to
     *         suspend
     */
    boolean isResumed();

    /* ------------------------------------------------------------ */
    /**
     * @return true after a request has been redispatched as the result of a
     *         timeout. Returns false after any subsequent call to suspend.
     */
    boolean isExpired();

    /* ------------------------------------------------------------ */
    /**
     * @return true while the request is within the initial dispatch to the
     *         filter chain and/or servlet. Will return false once the calling
     *         thread has returned to the container after suspend has been
     *         called and during any subsequent redispatch.
     */
    boolean isInitial();

    /* ------------------------------------------------------------ */
    /**
     * Is the suspended response wrapped.
     * <p>
     * Filters that wrap the response object should check this method to 
     * determine if they should destroy/finish the wrapped response. If 
     * the request was suspended with a call to {@link #suspend(ServletResponse)}
     * that passed the wrapped response, then the filter should register
     * a {@link ContinuationListener} to destroy/finish the wrapped response
     * during a call to {@link ContinuationListener#onComplete(Continuation)}.
     * @return True if {@link #suspend(ServletResponse)} has been passed a
     * {@link ServletResponseWrapper} instance.
     */
    boolean isResponseWrapped();


    /* ------------------------------------------------------------ */
    /**
     * Get the suspended response.
     * @return the {@link ServletResponse} passed to {@link #suspend(ServletResponse)}.
     */
    ServletResponse getServletResponse();
    
    /* ------------------------------------------------------------ */
    /** 
     * Add a ContinuationListener.
     * 
     * @param listener
     */
    void addContinuationListener(ContinuationListener listener);
    
    /* ------------------------------------------------------------ */
    /** Set a request attribute.
     * This method is a convenience method to call the {@link ServletRequest#setAttribute(String, Object)}
     * method on the associated request object.
     * This is a thread safe call and may be called by any thread.
     * @param name the attribute name
     * @param attribute the attribute value
     */
    public void setAttribute(String name, Object attribute);
    
    /* ------------------------------------------------------------ */
    /** Get a request attribute.
     * This method is a convenience method to call the {@link ServletRequest#getAttribute(String)}
     * method on the associated request object.
     * This is a thread safe call and may be called by any thread.
     * @param name the attribute name
     * @return the attribute value
     */
    public Object getAttribute(String name);
    
    /* ------------------------------------------------------------ */
    /** Remove a request attribute.
     * This method is a convenience method to call the {@link ServletRequest#removeAttribute(String)}
     * method on the associated request object.
     * This is a thread safe call and may be called by any thread.
     * @param name the attribute name
     */
    public void removeAttribute(String name);
    
    //取消当前被分发到continuation上的线程,并且抛出异常:ContinuationThrowable
    public void undispatch() throws ContinuationThrowable;
}

接着我们再来看它的实现,在jetty7或者7以上的版本,已经内置了AsyncContinuation组件了。

continuation其实是通过对状态位的控制来达到控制请求线程挂起或者恢复的。

我们来看下状态,以下代码在AsyncContinuation中:

    private static final int __IDLE=0;         // Idle request
    private static final int __DISPATCHED=1;   // Request dispatched to filter/servlet
    private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container
    private static final int __REDISPATCHING=3;// resumed while dispatched
    private static final int __ASYNCWAIT=4;    // Suspended and parked
    private static final int __REDISPATCH=5;   // Has been scheduled
    private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet
    private static final int __COMPLETING=7;   // complete while dispatched
    private static final int __UNCOMPLETED=8;  // Request is completable
    private static final int __COMPLETED=9;    // Request is complete

比如我们调用suspend函数:

public void suspend()
    {
        _responseWrapped=false;
        _continuation=true;
        doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());       
    }
private void doSuspend(final ServletContext context,
            final ServletRequest request,
            final ServletResponse response)
    {
        synchronized (this)
        {
            switch(_state)
            {
                case __DISPATCHED:
                case __REDISPATCHED:
                    _resumed=false;
                    _expired=false;

                    if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext())
                        _event=new AsyncEventState(context,request,response);
                    else
                    {
                        _event._dispatchContext=null;
                        _event._pathInContext=null;
                    }
                    _state=__ASYNCSTARTED;
                    List<AsyncListener> recycle=_lastAsyncListeners;
                    _lastAsyncListeners=_asyncListeners;
                    _asyncListeners=recycle;
                    if (_asyncListeners!=null)
                        _asyncListeners.clear();
                    break;

                default:
                    throw new IllegalStateException(this.getStatusString());
            }
        }
 

这个时候,状态被变成了ASYNCSTARTED.

我们回过头来回顾AbstractHttpConnection的handle方法:

...
if (_request._async.isAsync())
                    {
                       if (_request._async.isDispatchable())
                           handleRequest();
                    }
...

这个时候handleRequest执行的逻辑为:

...
_request.setDispatcherType(DispatcherType.ASYNC);
                        server.handleAsync(this);

在请求执行完之后:

...
finally
                {
                    // Complete async requests 
                    if (error && _request.isAsyncStarted())
                        _request.getAsyncContinuation().errorComplete();
                        
                    was_continuation=_request._async.isContinuation();
                    handling = !_request._async.unhandle() && server.isRunning() && _server!=null;
                }
                
                

protected boolean unhandle()
    {
        synchronized (this)
        {
            switch(_state)
            {
                case __REDISPATCHED:
                case __DISPATCHED:
                    _state=__UNCOMPLETED;
                    return true;

                case __IDLE:
                    throw new IllegalStateException(this.getStatusString());

                case __ASYNCSTARTED:
                    _initial=false;
                    _state=__ASYNCWAIT;
                    scheduleTimeout(); // could block and change state.
                    if (_state==__ASYNCWAIT)
                        return true;
                    else if (_state==__COMPLETING)
                    {
                        _state=__UNCOMPLETED;
                        return true;
                    }         
                    _initial=false;
                    _state=__REDISPATCHED;
                    return false; 

                case __REDISPATCHING:
                    _initial=false;
                    _state=__REDISPATCHED;
                    return false; 

                case __COMPLETING:
                    _initial=false;
                    _state=__UNCOMPLETED;
                    return true;

                default:
                    throw new IllegalStateException(this.getStatusString());
            }
        }
    }

因为我们刚才的状态为ASYNCSTARTED,所以会执行scheduleTimeout挂起当前线程。

最后我们来看下类图和状态流转图,有个深刻的印象。

about_jetty_8.txt · 最后更改: 2018/10/14 15:31 (外部编辑)