作者:陈科
联系方式:chenke1818@gmail.com
转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_jetty_8
我知道假如不使用comet的长连接技术,那么客户端将会轮询服务器获取数据,这样服务器压力很大,Jetty通过提供一个Continuation组件来实现对servlet/filter请求的反转控制,避免每次请求对服务器的轮询,在客户端在服务器建立连接之后,服务器可以反向推送数据给客户端。
实现Continuation的编程方法需要几个步骤:
a.获取Continuation实例:
Continuation continuation = ContinuationSupport.getContinuation(request);
b.挂起请求的线程:
void doGet(HttpServletRequest request, HttpServletResponse response) { ... // optionally: // continuation.setTimeout(long); continuation.suspend(); ... }
c.恢复挂起的线程
continuation.setAttribute("results",results); continuation.resume();
d.注册回调事件,我们还可以注册回调,在onTimeout,onComplete等函数被执行后调用。
void doGet(HttpServletRequest request, HttpServletResponse response) { ... Continuation continuation = ContinuationSupport.getContinuation(request); continuation.addContinuationListener(new ContinuationListener() { public void onTimeout(Continuation continuation) { ... } public void onComplete(Continuation continuation) { ... } }); continuation.suspend(); ... }
接下来我们可以看一个聊天室实例:
1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package com.acme; 20 21 import java.io.IOException; 22 import java.io.PrintWriter; 23 import java.util.HashMap; 24 import java.util.LinkedList; 25 import java.util.Map; 26 import java.util.Queue; 27 28 import javax.servlet.ServletException; 29 import javax.servlet.http.HttpServlet; 30 import javax.servlet.http.HttpServletRequest; 31 import javax.servlet.http.HttpServletResponse; 32 33 import org.eclipse.jetty.continuation.Continuation; 34 import org.eclipse.jetty.continuation.ContinuationSupport; 35 36 37 // Simple asynchronous Chat room. 38 // This does not handle duplicate usernames or multiple frames/tabs from the same browser 39 // Some code is duplicated for clarity. 40 public class ChatServlet extends HttpServlet 41 { 42 43 // inner class to hold message queue for each chat room member 44 class Member 45 { 46 String _name; 47 Continuation _continuation; 48 Queue<String> _queue = new LinkedList<String>(); 49 } 50 51 Map<String,Map<String,Member>> _rooms = new HashMap<String,Map<String, Member>>(); 52 53 54 // Handle Ajax calls from browser 55 @Override 56 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 57 { 58 // Ajax calls are form encoded 59 String action = request.getParameter("action"); 60 String message = request.getParameter("message"); 61 String username = request.getParameter("user"); 62 63 if (action.equals("join")) 64 join(request,response,username); 65 else if (action.equals("poll")) 66 poll(request,response,username); 67 else if (action.equals("chat")) 68 chat(request,response,username,message); 69 } 70 71 private synchronized void join(HttpServletRequest request,HttpServletResponse response,String username) 72 throws IOException 73 { 74 Member member = new Member(); 75 member._name=username; 76 Map<String,Member> room=_rooms.get(request.getPathInfo()); 77 if (room==null) 78 { 79 room=new HashMap<String,Member>(); 80 _rooms.put(request.getPathInfo(),room); 81 } 82 room.put(username,member); 83 response.setContentType("text/json;charset=utf-8"); 84 PrintWriter out=response.getWriter(); 85 out.print("{action:\"join\"}"); 86 } 87 88 private synchronized void poll(HttpServletRequest request,HttpServletResponse response,String username) 89 throws IOException 90 { 91 Map<String,Member> room=_rooms.get(request.getPathInfo()); 92 if (room==null) 93 { 94 response.sendError(503); 95 return; 96 } 97 Member member = room.get(username); 98 if (member==null) 99 { 100 response.sendError(503); 101 return; 102 } 103 104 synchronized(member) 105 { 106 if (member._queue.size()>0) 107 { 108 // Send one chat message 109 response.setContentType("text/json;charset=utf-8"); 110 StringBuilder buf=new StringBuilder(); 111 112 buf.append("{\"action\":\"poll\","); 113 buf.append("\"from\":\""); 114 buf.append(member._queue.poll()); 115 buf.append("\","); 116 117 String message = member._queue.poll(); 118 int quote=message.indexOf('"'); 119 while (quote>=0) 120 { 121 message=message.substring(0,quote)+'\\'+message.substring(quote); 122 quote=message.indexOf('"',quote+2); 123 } 124 buf.append("\"chat\":\""); 125 buf.append(message); 126 buf.append("\"}"); 127 byte[] bytes = buf.toString().getBytes("utf-8"); 128 response.setContentLength(bytes.length); 129 response.getOutputStream().write(bytes); 130 } 131 else 132 { 133 Continuation continuation = ContinuationSupport.getContinuation(request); 134 if (continuation.isInitial()) 135 { 136 // No chat in queue, so suspend and wait for timeout or chat 137 continuation.setTimeout(20000); 138 continuation.suspend(); 139 member._continuation=continuation; 140 } 141 else 142 { 143 // Timeout so send empty response 144 response.setContentType("text/json;charset=utf-8"); 145 PrintWriter out=response.getWriter(); 146 out.print("{action:\"poll\"}"); 147 } 148 } 149 } 150 } 151 152 private synchronized void chat(HttpServletRequest request,HttpServletResponse response,String username,String message) 153 throws IOException 154 { 155 Map<String,Member> room=_rooms.get(request.getPathInfo()); 156 if (room!=null) 157 { 158 // Post chat to all members 159 for (Member m:room.values()) 160 { 161 synchronized (m) 162 { 163 m._queue.add(username); // from 164 m._queue.add(message); // chat 165 166 // wakeup member if polling 167 if (m._continuation!=null) 168 { 169 m._continuation.resume(); 170 m._continuation=null; 171 } 172 } 173 } 174 } 175 176 response.setContentType("text/json;charset=utf-8"); 177 PrintWriter out=response.getWriter(); 178 out.print("{action:\"chat\"}"); 179 } 180 181 // Serve the HTML with embedded CSS and Javascript. 182 // This should be static content and should use real JS libraries. 183 @Override 184 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 185 { 186 if (request.getParameter("action")!=null) 187 doPost(request,response); 188 else 189 getServletContext().getNamedDispatcher("default").forward(request,response); 190 } 191 192 }
上面的这个例子通过continuation来实现了一个服务器反向推送客户端的聊天室实例。
首先我们来看下Continuation接口:
public interface Continuation { public final static String ATTRIBUTE = "org.eclipse.jetty.continuation"; //设置挂起的等待时间,如果时间<=0那么将会永远等待 void setTimeout(long timeoutMs); //挂起当前线程 void suspend(); void suspend(ServletResponse response); //恢复挂起的线程 void resume(); //完成一个挂起的请求,该方法调用会,调用后suspend挂起的线程将会被唤醒 void complete(); /* ------------------------------------------------------------ */ /** * @return true after {@link #suspend()} has been called and before the * request has been redispatched due to being resumed, completed or * timed out. */ boolean isSuspended(); /* ------------------------------------------------------------ */ /** * @return true if the request has been redispatched by a call to * {@link #resume()}. Returns false after any subsequent call to * suspend */ boolean isResumed(); /* ------------------------------------------------------------ */ /** * @return true after a request has been redispatched as the result of a * timeout. Returns false after any subsequent call to suspend. */ boolean isExpired(); /* ------------------------------------------------------------ */ /** * @return true while the request is within the initial dispatch to the * filter chain and/or servlet. Will return false once the calling * thread has returned to the container after suspend has been * called and during any subsequent redispatch. */ boolean isInitial(); /* ------------------------------------------------------------ */ /** * Is the suspended response wrapped. * <p> * Filters that wrap the response object should check this method to * determine if they should destroy/finish the wrapped response. If * the request was suspended with a call to {@link #suspend(ServletResponse)} * that passed the wrapped response, then the filter should register * a {@link ContinuationListener} to destroy/finish the wrapped response * during a call to {@link ContinuationListener#onComplete(Continuation)}. * @return True if {@link #suspend(ServletResponse)} has been passed a * {@link ServletResponseWrapper} instance. */ boolean isResponseWrapped(); /* ------------------------------------------------------------ */ /** * Get the suspended response. * @return the {@link ServletResponse} passed to {@link #suspend(ServletResponse)}. */ ServletResponse getServletResponse(); /* ------------------------------------------------------------ */ /** * Add a ContinuationListener. * * @param listener */ void addContinuationListener(ContinuationListener listener); /* ------------------------------------------------------------ */ /** Set a request attribute. * This method is a convenience method to call the {@link ServletRequest#setAttribute(String, Object)} * method on the associated request object. * This is a thread safe call and may be called by any thread. * @param name the attribute name * @param attribute the attribute value */ public void setAttribute(String name, Object attribute); /* ------------------------------------------------------------ */ /** Get a request attribute. * This method is a convenience method to call the {@link ServletRequest#getAttribute(String)} * method on the associated request object. * This is a thread safe call and may be called by any thread. * @param name the attribute name * @return the attribute value */ public Object getAttribute(String name); /* ------------------------------------------------------------ */ /** Remove a request attribute. * This method is a convenience method to call the {@link ServletRequest#removeAttribute(String)} * method on the associated request object. * This is a thread safe call and may be called by any thread. * @param name the attribute name */ public void removeAttribute(String name); //取消当前被分发到continuation上的线程,并且抛出异常:ContinuationThrowable public void undispatch() throws ContinuationThrowable; }
接着我们再来看它的实现,在jetty7或者7以上的版本,已经内置了AsyncContinuation组件了。
continuation其实是通过对状态位的控制来达到控制请求线程挂起或者恢复的。
我们来看下状态,以下代码在AsyncContinuation中:
private static final int __IDLE=0; // Idle request private static final int __DISPATCHED=1; // Request dispatched to filter/servlet private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container private static final int __REDISPATCHING=3;// resumed while dispatched private static final int __ASYNCWAIT=4; // Suspended and parked private static final int __REDISPATCH=5; // Has been scheduled private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet private static final int __COMPLETING=7; // complete while dispatched private static final int __UNCOMPLETED=8; // Request is completable private static final int __COMPLETED=9; // Request is complete
比如我们调用suspend函数:
public void suspend() { _responseWrapped=false; _continuation=true; doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse()); }
private void doSuspend(final ServletContext context, final ServletRequest request, final ServletResponse response) { synchronized (this) { switch(_state) { case __DISPATCHED: case __REDISPATCHED: _resumed=false; _expired=false; if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext()) _event=new AsyncEventState(context,request,response); else { _event._dispatchContext=null; _event._pathInContext=null; } _state=__ASYNCSTARTED; List<AsyncListener> recycle=_lastAsyncListeners; _lastAsyncListeners=_asyncListeners; _asyncListeners=recycle; if (_asyncListeners!=null) _asyncListeners.clear(); break; default: throw new IllegalStateException(this.getStatusString()); } }
这个时候,状态被变成了ASYNCSTARTED.
我们回过头来回顾AbstractHttpConnection的handle方法:
... if (_request._async.isAsync()) { if (_request._async.isDispatchable()) handleRequest(); } ...
这个时候handleRequest执行的逻辑为:
... _request.setDispatcherType(DispatcherType.ASYNC); server.handleAsync(this);
在请求执行完之后:
... finally { // Complete async requests if (error && _request.isAsyncStarted()) _request.getAsyncContinuation().errorComplete(); was_continuation=_request._async.isContinuation(); handling = !_request._async.unhandle() && server.isRunning() && _server!=null; } protected boolean unhandle() { synchronized (this) { switch(_state) { case __REDISPATCHED: case __DISPATCHED: _state=__UNCOMPLETED; return true; case __IDLE: throw new IllegalStateException(this.getStatusString()); case __ASYNCSTARTED: _initial=false; _state=__ASYNCWAIT; scheduleTimeout(); // could block and change state. if (_state==__ASYNCWAIT) return true; else if (_state==__COMPLETING) { _state=__UNCOMPLETED; return true; } _initial=false; _state=__REDISPATCHED; return false; case __REDISPATCHING: _initial=false; _state=__REDISPATCHED; return false; case __COMPLETING: _initial=false; _state=__UNCOMPLETED; return true; default: throw new IllegalStateException(this.getStatusString()); } } }
因为我们刚才的状态为ASYNCSTARTED,所以会执行scheduleTimeout挂起当前线程。
最后我们来看下类图和状态流转图,有个深刻的印象。