Tomcat源码分析-web请求处理过程

请求

请求时由Connector来处理。从启动连接器开始:

启动连接器

连接器构造方法中实例化了一个ProtocolHandler协议处理器,用来处理请求。然后在启动方法中启动协议处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected void startInternal() throws LifecycleException {

// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}

setState(LifecycleState.STARTING);

try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}

然后在AbstractProtocol中启动endpoint:

启动endpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
}

endpoint.start();

// Start timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}

endpoint的实现有几种:
20200406092859

比如:NioEndPoint

NioEndPoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 启动NIO端点,创建acceptor和poller线程。
*/
@Override
public void startInternal() throws Exception {

if (!running) {
running = true;
paused = false;

processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());

// 创建线程池
if ( getExecutor() == null ) {
createExecutor();
}

initializeConnectionLatch();

// 启动轮询器线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//启动线程s 是父类AbstractEndpoint方法
startAcceptorThreads();
}
}

中定义了内部类,通过线程方式处理。

Acceptor接收请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
protected class Acceptor extends AbstractEndpoint.Acceptor {

@Override
public void run() {

int errorDelay = 0;

// Loop until we receive a shutdown command
while (running) {

// Loop if endpoint is paused 自旋的方式
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}

if (!running) {
break;
}
state = AcceptorState.RUNNING;

try {
//i如果达到最大连接数,请稍候
countUpOrAwaitConnection();

SocketChannel socket = null;
try {
// 接受套接字
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
countDownConnection();
if (running) {
// 引入延迟
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// 成功接受,重置错误延迟
errorDelay = 0;

// 配置套接字
if (running && !paused) {
// 把套接字传递给适当的处理器
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
private void closeSocket(SocketChannel socket) {
countDownConnection();
try {
socket.socket().close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
try {
socket.close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
}
}

调用setSocketOptions方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//处理指定的连接
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//禁用阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);

//就是把SocketChannel转换为Nio Channel
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {//HTTPS
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//注册进去
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}

//Poller方法,向轮询器注册新创建的套接字
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
//新建PollerEvent 加入到 EventQueue
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}

Poller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@Override
public void run() {
// Loop until destroy() is called
while (true) {

boolean hasEvents = false;

try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//非阻塞
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
/**
* 遍历所有eventqueue中所有的PollorEvent,然后依次调用PollorEvent的run方法,将socket注册到selector
*/
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());

Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
//接受请求后,移除。
iterator.remove();
processKey(sk, attachment);
}
}//while

//超时
timeout(keyCount,hasEvents);
}//while

getStopLatch().countDown();
}

//处理轮询器事件队列中的事件
public boolean events() {
boolean result = false;
//PollerEvent
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}

return result;
}

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// 先读后写
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}

//通过处理器 处理具有给定状态的给定SocketWrapper。
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
//如果没有拿到socket处理器,就去创建一个。
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
//通过线程池处理
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// OOM或类似的创建线程,或者池及其队列已满
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
1
protected abstract SocketProcessorBase<S> createSocketProcessor(SocketWrapperBase<S> socketWrapper, SocketEvent event);

createSocketProcessor方法是抽象方法,去他的处理器实现:

20200406100928

AbstractHttp11Protocol extends AbstractProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@Override
protected Processor createProcessor() {
//Http11Processor
Http11Processor processor = new Http11Processor(this, getEndpoint());
processor.setAdapter(getAdapter());
//默认的keepalive的情况下,默认每个socket最大处理请求次数
processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests());
//开启keepalive的timeout
processor.setConnectionUploadTimeout(getConnectionUploadTimeout());
//文件上传的默认超时时间
processor.setDisableUploadTimeout(getDisableUploadTimeout());

processor.setRestrictedUserAgents(getRestrictedUserAgents());
processor.setMaxSavePostSize(getMaxSavePostSize());
return processor;
}
```
通过处理后,设置对应的适配器。processor.setAdapter(getAdapter());通过适配器去匹配Mapper中的映射,如果找不到,就是请求错了。

最中到public class CoyoteAdapter implements Adapter中去处理。通过适配器就过度到相应的容器上去处理。
### 通过Adapter找到对应的容器
适配器执行方法:public class CoyoteAdapter implements Adapter
```java
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
//把 coyote对象,转换为 http的对象
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);

if (request == null) {
// 通过连接器创建Request Response
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);

// 关联
request.setResponse(response);
response.setRequest(request);

// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);

// 设置URICharset 编码
req.getParameters().setQueryStringCharset(connector.getURICharset());
}

if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}

boolean async = false;
boolean postParseSuccess = false;

req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

try {
// 分析并设置Catalina和配置特定的请求参数
// 在map中解析请求
//在分析HTTP头之后执行必要的处理,以便将请求/响应对传递到容器管道的开始处进行处理。
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// 调用容器:
// 通过连接器获得service可以获取容器(Engine只有一个),再获取调用链。在Engine(StandardEngine)启动后 pipeline也会启动,维护的是StandardEngineValve
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// 可能在service()方法期间读取了所有数据,因此需要在此处检查
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}

Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

// 如果异步请求已启动,则在该容器线程完成并发生错误后不会结束,则触发异步错误进程
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
//请求响应完成
request.finishRequest();
response.finishResponse();
}

} catch (IOException e) {
// Ignore
} finally {
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);

if (request.isAsyncCompleting() && error.get()) {
// 如果有字符,现在就将它们全部刷新到字节缓冲区,因为字节用于计算内容长度(当然,如果所有内容都适合字节缓冲区)。
res.action(ActionCode.ASYNC_POST_PROCESS, null);
async = false;
}

// Access log
if (!async && postParseSuccess) {
// 仅在调用处理时记录。如果postParseRequest()失败,则它已将其记录下来。
Context context = request.getContext();
Host host = request.getHost();
//如果上下文为空,则可能是终结点已关闭,此连接已关闭,请求已在其他线程中回收。
//该线程已经更新了访问日志,因此在这种情况下不更新访问日志是可以的。
//另一种可能是在 无法将处理和请求映射到上下文。
//在这种情况下,通过主机或引擎登录。
long time = System.currentTimeMillis() - req.getStartTime();
if (context != null) {
context.logAccess(request, response, time, false);
} else if (response.isError()) {
if (host != null) {
host.logAccess(request, response, time, false);
} else {
connector.getService().getContainer().logAccess(
request, response, time, false);
}
}
}

req.getRequestProcessor().setWorkerThreadName(null);

// 回收包装器请求和响应
if (!async) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
}

connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

通过连接器获得service可以获取容器(Engine只有一个),再获取调用链。在Engine(StandardEngine)启动后 pipeline也会启动,维护的是StandardEngineValve

invoke方法是StandardEngineValve中的invoke方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void invoke(Request request, Response response)
throws IOException, ServletException {

// 选择用于此请求的主机
Host host = request.getHost();
if (host == null) {
response.sendError
(HttpServletResponse.SC_BAD_REQUEST, sm.getString("standardEngine.noHost",request.getServerName()));
return;
}
if (request.isAsyncSupported()) {
request.setAsyncSupported(host.getPipeline().isAsyncSupported());
}

// 请此主机处理此请求
host.getPipeline().getFirst().invoke(request, response);

}

StandardEngineValve中的invoke又会调用host中的StandardHostValve的invoke方法,依次StandardContextValve->StandardWrapperValve,最后到StandardWrapperValve中的invoke,这里处理servlet。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@Override
public final void invoke(Request request, Response response)
throws IOException, ServletException {

// Initialize local variables we may need
boolean unavailable = false;
Throwable throwable = null;
// This should be a Request attribute...
long t1=System.currentTimeMillis();
//增加请求次数
requestCount.incrementAndGet();
//获取StandardWrapper,每次都有StandardWrapper和value
StandardWrapper wrapper = (StandardWrapper) getContainer();
//获取Servlet
Servlet servlet = null;
Context context = (Context) wrapper.getParent();

// Check for the application being marked unavailable
if (!context.getState().isAvailable()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
sm.getString("standardContext.isUnavailable"));
unavailable = true;
}

// 检查是否获取到 包装头信息
if (!unavailable && wrapper.isUnavailable()) {
container.getLogger().info(sm.getString("standardWrapper.isUnavailable",
wrapper.getName()));
long available = wrapper.getAvailable();
if ((available > 0L) && (available < Long.MAX_VALUE)) {
response.setDateHeader("Retry-After", available);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
sm.getString("standardWrapper.isUnavailable",
wrapper.getName()));
} else if (available == Long.MAX_VALUE) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
sm.getString("standardWrapper.notFound",
wrapper.getName()));
}
unavailable = true;
}

// 分配一个servlet实例来处理这个请求
try {
//没有获取到,就加载,Tomcat默认是按需加载。(延迟加载)
if (!unavailable) {
//实例化
servlet = wrapper.allocate();
}
} catch (UnavailableException e) {
container.getLogger().error(
sm.getString("standardWrapper.allocateException",
wrapper.getName()), e);
long available = wrapper.getAvailable();
if ((available > 0L) && (available < Long.MAX_VALUE)) {
response.setDateHeader("Retry-After", available);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
sm.getString("standardWrapper.isUnavailable",
wrapper.getName()));
} else if (available == Long.MAX_VALUE) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
sm.getString("standardWrapper.notFound",
wrapper.getName()));
}
} catch (ServletException e) {
container.getLogger().error(sm.getString("standardWrapper.allocateException",
wrapper.getName()), StandardWrapper.getRootCause(e));
throwable = e;
exception(request, response, e);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
container.getLogger().error(sm.getString("standardWrapper.allocateException",
wrapper.getName()), e);
throwable = e;
exception(request, response, e);
servlet = null;
}

MessageBytes requestPathMB = request.getRequestPathMB();
DispatcherType dispatcherType = DispatcherType.REQUEST;
if (request.getDispatcherType()==DispatcherType.ASYNC) dispatcherType = DispatcherType.ASYNC;
request.setAttribute(Globals.DISPATCHER_TYPE_ATTR,dispatcherType);
request.setAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR,
requestPathMB);
// 过滤链,层层通过过滤器,再层层返回。
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

// Call the filter chain for this request
// NOTE: This also calls the servlet's service() method
try {
if ((servlet != null) && (filterChain != null)) {
// Swallow output if needed
if (context.getSwallowOutput()) {
try {
SystemLogHandler.startCapture();
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
//执行过滤链
filterChain.doFilter(request.getRequest(),
response.getResponse());
}
} finally {
String log = SystemLogHandler.stopCapture();
if (log != null && log.length() > 0) {
context.getLogger().info(log);
}
}
} else {
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
filterChain.doFilter
(request.getRequest(), response.getResponse());
}
}

}
} catch (ClientAbortException | CloseNowException e) {
if (container.getLogger().isDebugEnabled()) {
container.getLogger().debug(sm.getString(
"standardWrapper.serviceException", wrapper.getName(),
context.getName()), e);
}
throwable = e;
exception(request, response, e);
} catch (IOException e) {
container.getLogger().error(sm.getString(
"standardWrapper.serviceException", wrapper.getName(),
context.getName()), e);
throwable = e;
exception(request, response, e);
} catch (UnavailableException e) {
container.getLogger().error(sm.getString(
"standardWrapper.serviceException", wrapper.getName(),
context.getName()), e);
// throwable = e;
// exception(request, response, e);
wrapper.unavailable(e);
long available = wrapper.getAvailable();
if ((available > 0L) && (available < Long.MAX_VALUE)) {
response.setDateHeader("Retry-After", available);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
sm.getString("standardWrapper.isUnavailable",
wrapper.getName()));
} else if (available == Long.MAX_VALUE) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
sm.getString("standardWrapper.notFound",
wrapper.getName()));
}
// Do not save exception in 'throwable', because we
// do not want to do exception(request, response, e) processing
} catch (ServletException e) {
Throwable rootCause = StandardWrapper.getRootCause(e);
if (!(rootCause instanceof ClientAbortException)) {
container.getLogger().error(sm.getString(
"standardWrapper.serviceExceptionRoot",
wrapper.getName(), context.getName(), e.getMessage()),
rootCause);
}
throwable = e;
exception(request, response, e);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
container.getLogger().error(sm.getString(
"standardWrapper.serviceException", wrapper.getName(),
context.getName()), e);
throwable = e;
exception(request, response, e);
} finally {
// 释放此请求的过滤器链
if (filterChain != null) {
filterChain.release();
}

// 回收到servlet池,取消分配分配的servlet实例
try {
if (servlet != null) {
wrapper.deallocate(servlet);
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
container.getLogger().error(sm.getString("standardWrapper.deallocateException",
wrapper.getName()), e);
if (throwable == null) {
throwable = e;
exception(request, response, e);
}
}

// If this servlet has been marked permanently unavailable,
// unload it and release this instance
try {
if ((servlet != null) &&
(wrapper.getAvailable() == Long.MAX_VALUE)) {
wrapper.unload();
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
container.getLogger().error(sm.getString("standardWrapper.unloadException",
wrapper.getName()), e);
if (throwable == null) {
exception(request, response, e);
}
}
long t2=System.currentTimeMillis();

long time=t2-t1;
processingTime += time;
if( time > maxTime) maxTime=time;
if( time < minTime) minTime=time;
}
}

一直到找到servlet

实例化servlet

allocate()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@Override
public Servlet allocate() throws ServletException {

// If we are currently unloading this servlet, throw an exception
if (unloading) {
throw new ServletException(sm.getString("standardWrapper.unloading", getName()));
}

boolean newInstance = false;

// 如果不是单线程模型,则每次返回相同的实例
if (!singleThreadModel) {
// 必要时加载并初始化我们的实例
if (instance == null || !instanceInitialized) {
synchronized (this) {
if (instance == null) {
try {
if (log.isDebugEnabled()) {
log.debug("Allocating non-STM instance");
}

// 注意:在加载之前,我们不知道Servlet是否实现了SingleThreadModel。
instance = loadServlet();
newInstance = true;
if (!singleThreadModel) {
// For non-STM, increment here to prevent a race condition with unload. Bug 43683, test case
// #3
countAllocated.incrementAndGet();
}
} catch (ServletException e) {
throw e;
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("standardWrapper.allocate"), e);
}
}
if (!instanceInitialized) {
initServlet(instance);
}
}
}

//单线程模式
if (singleThreadModel) {
if (newInstance) {
// 必须在上述同步之外执行此操作,以防止可能的死锁
synchronized (instancePool) {
//往实例池里放
instancePool.push(instance);
nInstances++;
}
}
} else {
if (log.isTraceEnabled()) {
log.trace(" Returning non-STM instance");
}
// 对于新实例,在创建时计数将增加
if (!newInstance) {
countAllocated.incrementAndGet();
}
return instance;
}
}

synchronized (instancePool) {
while (countAllocated.get() >= nInstances) {
// 如果可能,分配一个新实例,否则等待
if (nInstances < maxInstances) {
try {
instancePool.push(loadServlet());
nInstances++;
} catch (ServletException e) {
throw e;
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("standardWrapper.allocate"), e);
}
} else {
try {
instancePool.wait();
} catch (InterruptedException e) {
// Ignore
}
}
}
if (log.isTraceEnabled()) {
log.trace(" Returning allocated STM instance");
}
countAllocated.incrementAndGet();
return instancePool.pop();
}
}

处理

上面最终通过连接器的适配器执行service:
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
最终可以找到servlet处理。

处理完成,层层返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//请求响应完成
request.finishRequest();
response.finishResponse();

public void finishResponse() throws IOException {
// Writing leftover bytes
outputBuffer.close();
}

public void close() throws IOException {

if (closed) {
return;
}
if (suspended) {
return;
}

// 如果有字符,就将它们全部刷新到字节缓冲区,计算内容长度(当然,如果所有内容都适合字节缓冲区)。
if (cb.remaining() > 0) {
//数据刷出 并进行字符转换
flushCharBuffer();
}

if ((!coyoteResponse.isCommitted()) && (coyoteResponse.getContentLengthLong() == -1)
&& !coyoteResponse.getRequest().method().equals("HEAD")) {
//如果这没有导致响应提交,则可以计算最终内容长度。只有当这不是头
//因为在这种情况下,不应该有任何机构
//在此处设置值为零将导致显式内容
//正在对响应设置零的长度。
if (!coyoteResponse.isCommitted()) {
coyoteResponse.setContentLength(bb.remaining());
}
}
//数据对象,刷出到outputbuffer
if (coyoteResponse.getStatus() == HttpServletResponse.SC_SWITCHING_PROTOCOLS) {
doFlush(true);
} else {
doFlush(false);
}
closed = true;

//请求应该在响应时已完全读取
//已关闭。进一步读取输入a)是没有意义的,b)真的
//混淆AJP(错误50189),关闭输入缓冲区以防止它们。
//转换为http对象
Request req = (Request) coyoteResponse.getRequest().getNote(CoyoteAdapter.ADAPTER_NOTES);
req.inputBuffer.close();

coyoteResponse.action(ActionCode.CLOSE, null);
}

请求时序图:
20200406144305

总结accept流程

Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。

Acceptor线程组。用于接受新连接,并将新连接封装一下,选择一个 Poller 将新连接添加到 Poller 的事件队列中,Acceptor线程组是多个线程组成的线程组。
Poller 线程组。用于监听 Socket 事件,当 Socket 可读或可写等等时,将 Socket 封装一下添加到 worker 线程池的任务队列中,Poller线程组是多个线程组成的线程组。
worker 线程组。用于对请求进行处理,包括分析请求报文并创建 Request 对象,调用容器的 pipeline 进行处理,worker线程组是Executor创建的线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

public void startInternal() throws Exception {
// 创建worker线程组
if ( getExecutor() == null ) {
createExecutor();
}

// Poller线程组由一堆线程组成
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}

startAcceptorThreads();
}
}
}

public abstract class AbstractEndpoint<S> {
// Acceptor线程组由一堆线程组成
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];

for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

// worker的线程组由executor创建线程池组成
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
}

请求处理过程

Acceptor接收连接

Acceptor接受的新连接没有立即注册到selector当中,需要先封装成PollerEvent对象后保存至PollerEvent队列当中,Poller对象会消费PollerEvent队列,类似生产消费模型。

Acceptor 在启动后会阻塞在 ServerSocketChannel.accept(); 方法处,当有新连接到达时,该方法返回一个 SocketChannel。

setSocketOptions()方法将 Socket 封装到 NioChannel 中,并注册到 Poller。

一开始就启动了多个 Poller 线程,注册的时候采用轮询选择 Poller 。NioEndpoint 维护了一个 Poller 数组,当一个连接分配给 pollers[index] 时,下一个连接就会分配给 pollers[(index+1)%pollers.length]。

addEvent() 方法会将 Socket 添加到该 Poller 的 PollerEvent 队列中。到此 Acceptor 的任务就完成了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

private volatile ServerSocketChannel serverSock = null;
protected class Acceptor extends AbstractEndpoint.Acceptor {

public void run() {

while (running) {
state = AcceptorState.RUNNING;
try {
SocketChannel socket = null;
try {
// 监听socket负责接收新连接
socket = serverSock.accept();
} catch (IOException ioe) {
}

if (running && !paused) {
// 处理接受到的socket对象
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
}
} catch (Throwable t) {
}
}
state = AcceptorState.ENDED;
}
}


protected boolean setSocketOptions(SocketChannel socket) {
try {
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);

channel = new NioChannel(socket, bufhandler);
// 注册到Poller当中
getPoller0().register(channel);
} catch (Throwable t) {
}

return true;
}

public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}


public class Poller implements Runnable {
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
r = new PollerEvent(socket,ka,OP_REGISTER);

// 添加PollerEvent队列当中
addEvent(r);
}


private void addEvent(PollerEvent event) {
// 投入到PollerEvent队列当中
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
}

}

Poller处理请求

Poller会消费PollerEvent队列(由Acceptor进行投递),并注册到Selector当中。当注册到Selector的socket数据可读的时候将socket封装成SocketProcessor对象,投递到Executor实现的线程池进行处理。

selector.select(1000)。当 Poller 启动后因为 selector 中并没有已注册的 Channel,所以当执行到该方法时只能阻塞。所有的 Poller 共用一个 Selector,其实现类是 sun.nio.ch.SelectorImpl。

events() 方法通过 addEvent() 方法添加到事件队列中的 Socket 注册到SelectorImpl。这里指的socket是accept过来的请求的socket。

当 Socket 可读时,Poller 才对其进行处理,createSocketProcessor() 方法将 Socket 封装到 SocketProcessor 中,SocketProcessor 实现了 Runnable 接口。worker 线程通过调用其 run() 方法来对 Socket 进行处理。

execute(SocketProcessor) 方法将 SocketProcessor 提交到线程池,放入线程池的 workQueue 中。workQueue 是 BlockingQueue 的实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

public static class PollerEvent implements Runnable {
private NioChannel socket;
private int interestOps;
private NioSocketWrapper socketWrapper;

public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
reset(ch, w, intOps);
}

public void run() {
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {

}
}
}
}


public class Poller implements Runnable {

public void run() {
while (true) {
// events()负责处理PollerEvent事件并注册到selector当中
hasEvents = events();
keyCount = selector.select(selectorTimeout);

// 处理新接受的socket的读写事件
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();

processKey(sk, attachment);
}
}
}

// 处理读写事件
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}

if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
}
}
}


public abstract class AbstractEndpoint<S> {
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {

try {
sc = createSocketProcessor(socketWrapper, event);
Executor executor = getExecutor();
// 注册到Worker的线程池ThreadPoolExecutor。
if (dispatch && executor != null) {
executor.execute(sc);
}
} catch (RejectedExecutionException ree) {
}

return true;
}
}

Worker处理具体请求

当新任务添加到 workQueue(ThreadPoolExecutor)后,workQueue.take()方法会返回一个 Runnable,通常是 SocketProcessor,然后 worker 线程调用 SocketProcessor的run() -> doRun()方法对 Socket 进行处理。

createProcessor() 会创建一个Http11Processor, 它用来解析 Socket,将 Socket 中的内容封装到Request中。注意这个Request是临时使用的一个类,它的全类名是org.apache.coyote.Request。

CoyoteAdapter的postParseRequest()方法封装一下 Request,并处理一下映射关系(从 URL 映射到相应的 Host、Context、Wrapper)。

CoyoteAdapter将 Rquest 提交给 Container(StandardEngine) 处理之前,并将 org.apache.coyote.Request封装到 org.apache.catalina.connector.Request,传递给 Container处理的 Request 是 org.apache.catalina.connector.Request。

connector.getService().getMapper().map(),用来在Mapper中查询 URL 的映射关系。映射关系会保留到 org.apache.catalina.connector.Request 中,Container处理阶段 request.getHost()是使用的就是这个阶段查询到的映射主机,以此类推 request.getContext()、request.getWrapper()都是。

connector.getService().getContainer().getPipeline().getFirst().invoke()会将请求传递到 Container(StandardEngine)处理,至此进入了Engine->Host->Context->Wrapper的处理流程,当然了 Container处理也是在 Worker线程中执行的(也就是说Tomcat处理请求是通过ThreadPoolExecutor的线程池实现的),但是这是一个相对独立的模块,所以单独分出来一节。