Tomcat是如何实现异步Servlet

在Spring boot中实现一个servlet,需要再启动类加上@ServletComponentScan注解来扫描Servlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@WebServlet(urlPatterns = "/async",asyncSupported = true)
@Slf4j
public class AsyncServlet extends HttpServlet {

ExecutorService executorService =Executors.newSingleThreadExecutor();

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
//开启异步,获取异步上下文
final AsyncContext ctx = req.startAsync();
// 提交线程池异步执行
executorService.execute(new Runnable() {

@Override
public void run() {
try {
log.info("async Service 准备执行了");
//模拟耗时任务
Thread.sleep(10000L);
ctx.getResponse().getWriter().print("async servlet");
log.info("async Service 执行了");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//最后执行完成后完成回调。
ctx.complete();
}
});
}

异步Servlet的内部原理

上面主要是:final AsyncContext ctx = req.startAsync()和ctx.complete(),查看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}

if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}

asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());

return asyncContext;
}

req.startAsync()只是保存了一个异步上下文,同时设置一些基础信息,比如Timeout,这里设置的默认超时时间是30S,如果你的异步处理逻辑超过30S,此时执行ctx.complete()就会抛出IllegalStateException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
  public void complete() {
if (log.isDebugEnabled()) {
logDebug("complete ");
}
check();
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
}
//类:AbstractProcessor
public final void action(ActionCode actionCode, Object param) {
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
}
//类:AbstractProcessor
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}
//类:AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
//省略部分代码
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}

return true;
}

所以,这里最终会调用AbstractEndpoint的processSocket方法,EndPoint是用来接受和处理请求的,接下来就会交给Processor去进行协议处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
///类:AbstractProcessorLight
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
//省略部分diam
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {

} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
state = SocketState.CLOSED;
}

} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);

return state;
}

AbstractProcessorLight会根据SocketEvent的状态来判断是不是要去调用service(socketWrapper),该方法最终会去调用到容器,从而完成业务逻辑的调用,我们这个请求是执行完成后调用的,肯定不能进容器了,不然就是死循环了,这里通过isAsync()判断,就会进入dispatch(status),最终会调用CoyoteAdapter的asyncDispatch方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
SocketEvent status) throws Exception {
//省略部分代码
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
boolean success = true;
AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();
try {
if (!request.isAsync()) {
response.setSuspended(false);
}

if (status==SocketEvent.TIMEOUT) {
if (!asyncConImpl.timeout()) {
asyncConImpl.setErrorState(null, false);
}
} else if (status==SocketEvent.ERROR) {

}

if (!request.isAsyncDispatching() && request.isAsync()) {
WriteListener writeListener = res.getWriteListener();
ReadListener readListener = req.getReadListener();
if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
res.onWritePossible();//这里执行浏览器响应,写入数据
if (request.isFinished() && req.sendAllDataReadEvent() &&
readListener != null) {
readListener.onAllDataRead();
}
} catch (Throwable t) {

} finally {
request.getContext().unbind(false, oldCL);
}
}
}
}
//这里判断异步正在进行,说明这不是一个完成方法的回调,是一个正常异步请求,继续调用容器。
if (request.isAsyncDispatching()) {
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
if (t != null) {
asyncConImpl.setErrorState(t, true);
}
}
//注意,这里,如果超时或者出错,request.isAsync()会返回false,这里是为了尽快的输出错误给客户端。
if (!request.isAsync()) {
//这里也是输出逻辑
request.finishRequest();
response.finishResponse();
}
//销毁request和response
if (!success || !request.isAsync()) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
return success;
}

ctx.complete()执行最终的方法了,完成了数据的输出,最终输出到浏览器。

第一次doGet请求执行完成后,Tomcat是怎么知道不用返回到客户端的呢?关键代码在CoyoteAdapter中的service方法,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
postParseSuccess = postParseRequest(req, request, res, response);
//省略部分代码
if (postParseSuccess) {
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
} else {
//输出数据到客户端
request.finishRequest();
response.finishResponse();
if (!async) {
updateWrapperErrorCount(request, response);
//销毁request和response
request.recycle();
response.recycle();
}

调用完Servlet后,会通过request.isAsync()来判断是否是异步请求,如果是异步请求,就设置async = true。如果是非异步请求就执行输出数据到客户端逻辑,同时销毁request和response。这里就完成了请求结束后不响应客户端的操作。

Spring Boot的@EnableAsync注解不是异步Servlet

从业务层面来说,确实是异步编程,但是有一个问题,抛开业务的并行处理来说,针对整个请求来说,并不是异步的,也就是说不能立即释放Tomcat的线程,从而不能达到异步Servlet的效果。为什么它不是异步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@RestController
@Slf4j
public class TestController {
@Autowired
private TestService service;

@GetMapping("/hello")
public String test() {
try {
log.info("testAsynch Start");
CompletableFuture<String> test1 = service.test1();
CompletableFuture<String> test2 = service.test2();
CompletableFuture<String> test3 = service.test3();
CompletableFuture.allOf(test1, test2, test3);
log.info("test1=====" + test1.get());
log.info("test2=====" + test2.get());
log.info("test3=====" + test3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "hello";
}
@Service
public class TestService {
@Async("asyncExecutor")
public CompletableFuture<String> test1() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test1");
}

@Async("asyncExecutor")
public CompletableFuture<String> test2() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test2");
}

@Async("asyncExecutor")
public CompletableFuture<String> test3() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test3");
}
}
@SpringBootApplication
@EnableAsync
public class TomcatdebugApplication {

public static void main(String[] args) {
SpringApplication.run(TomcatdebugApplication.class, args);
}

@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsynchThread-");
executor.initialize();
return executor;
}

这里我请求之后,在调用容器执行业务逻辑之前打了一个断点,然后在返回之后的同样打了一个断点,在Controller执行完之后,请求才回到了CoyoteAdapter中,并且判断request.isAsync(),根据图中看到,是为false,那么接下来就会执行request.finishRequest()和response.finishResponse()
来执行响应的结束,并销毁请求和响应体。很有趣的事情是,我实验的时候发现,在执行request.isAsync()之前,浏览器的页面上已经出现了响应体,这是SpringBoot框架已经通过StringHttpMessageConverter类中的writeInternal方法已经进行输出了。
以上分析的核心逻辑就是,Tomcat的线程执行CoyoteAdapter调用容器后,必须要等到请求返回,然后再判断是否是异步请求,再处理请求,然后执行完毕后,线程才能进行回收。而我一最开始的异步Servlet例子,执行完doGet方法后,就会立即返回,也就是会直接到request.isAsync()的逻辑,然后整个线程的逻辑执行完毕,线程被回收。