探索httpClient 4.5.13 源码

客户端创建

1
2
3
4
5
6
CloseableHttpClient httpClient = HttpClientBuilder.create()
.setDefaultSocketConfig(socketConfig)
.setDefaultRequestConfig(requestConfig)
.setMaxConnTotal(totalNum)
.setMaxConnPerRoute(routeNum)
.build();

连接池

连接池建造者

建造者模式在创建HttpClient时如果没设置自定义连接池,会默认创建池化连接管理器
Registry用于创建套接字上下文,dnsResolver用于域名解析

HttpClientBuilder::build()

1
2
3
4
5
6
7
8
9
10
11
// line: 984
final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", sslSocketFactoryCopy)
.build(),
null,
null,
dnsResolver,
connTimeToLive,
connTimeToLiveTimeUnit != null ? connTimeToLiveTimeUnit : TimeUnit.MILLISECONDS);

池化连接管理器

使用内部连接工厂连接相关参数 构造CPool
InternalConnectionFactory作为入参,用于连接池调用工厂生产连接
默认全局最大连接20、每个路由最大连接2

PoolingHttpClientConnectionManager::()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// line: 175
/**
* @since 4.4
*/
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final long timeToLive, final TimeUnit timeUnit) {
super();
this.configData = new ConfigData();
this.pool = new CPool(new InternalConnectionFactory(
this.configData, connFactory), 2, 20, timeToLive, timeUnit);
this.pool.setValidateAfterInactivity(2000);
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
this.isShutDown = new AtomicBoolean(false);
}

如果没有自定义连接工厂,将使用默认实例

1
2
3
4
5
6
7
8
9
10
// line: 610
static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
...
InternalConnectionFactory(
final ConfigData configData,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
...
this.connFactory = connFactory != null ? connFactory :
ManagedHttpClientConnectionFactory.INSTANCE;
}

受控连接工厂

默认工厂实例化,用于处理IO缓冲流、上下文策略等
完成工厂施工后,即可交付连接池用于生产连接DefaultBHttpClientConnection

ManagedHttpClientConnectionFactory::()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// line: 74
/**
* @since 4.4
*/
public ManagedHttpClientConnectionFactory(
final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final HttpMessageParserFactory<HttpResponse> responseParserFactory,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy) {
super();
this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory :
DefaultHttpRequestWriterFactory.INSTANCE;
this.responseParserFactory = responseParserFactory != null ? responseParserFactory :
DefaultHttpResponseParserFactory.INSTANCE;
this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
LaxContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
StrictContentLengthStrategy.INSTANCE;
}

抽象连接池

池的大部分功能在抽象类AbstractConnPool里实现
实现 ConnPool 接口的租用释放
实现 ConnPoolControl 接口的连接池大小连接池状态

1
2
3
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry>
public abstract class AbstractConnPool <T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T>

T 路由类型:HttpRoute
C 连接类型:ManagedHttpClientConnection
E 连接实例:CPoolEntry

AbstractConnPool

连接池结构

  • PoolEntry<HttpRoute, ManagedHttpClientConnection>
    连接池中的实体:时间、连接(连接创建时绑定套接字)
  • ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory
    连接工厂:用于创建连接
    HttpClientBuilder::build() ->
    PoolingHttpClientConnectionManager::() ->
    CPool::(InternalConnectionFactory) ->
    ManagedHttpClientConnectionFactory::INSTANCE
  • HashSet<PoolEntry> leased
    存放被租用的连接:
    如果池中直接取到连接 leased+1
    如果池中取不到且小于限额,创建连接 leased+1
    归还连接时 leased-1
  • LinkedList<PoolEntry> available
    存放可用连接:
    租用连接时,如果连接不可用,或需要腾出连接 available-1
    归还连接时,如果连接可复用 available+1
  • LinkedList<PoolEntryFuture> pending
    存放等待获取连接的线程的Future:
    租用连接时,如果未成功获取连接,将加入等待队列并等待,直至被唤醒或等待超时
    归还连接时,从池里取等待Future,如果成功从等待队列中移除,唤醒全体等待线程来租用连接
  • Map<HttpRoute, RouteSpecificPool<T, C, E>> routeToPool
    每个路由对应的池:特定路由池内同样有leased available pending,用于不同路由的连接数量隔离
  • Lock lock
    锁:同步锁
    和池相关的属性访问、修改,都需要先获取锁以独占资源访问
  • Condition condition
    多线程竞态条件:
    线程释放连接、Future主动取消,将唤醒全体
    线程未成功获取连接,将主动进入等待状态,等待被唤醒

获取池条目

池内条目实例 E extends PoolEntry<T, C>
属性:ID、路由信息、受控连接、创建时间、更新时间、连接存活时间、过期时间及连接状态
其中存活时间过期时间的区别在于,存活时间不可更新,过期时间可随更新时间新过期时长的变化而变化
其中受控连接受控连接工厂AbstractConnPool租用连接时构建

租用连接

Future<CPoolEntry> 来自于抽象连接池,用于区分多客户端线程、获取池内条目结果

AbstractConnPool::lease

1
2
3
4
5
6
7
8
9
10
11
12
// line: 193
@Override
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
...
return new Future<E>() {
...
@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
...
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
...

受控连接工厂的实现内部连接工厂以路由为参数,构建连接实例

1
2
3
4
5
6
7
8
9
10
11
// line: 310
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
...
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
...

请求连接

池化连接管理器获取请求连接

PoolingHttpClientConnectionManager::requestConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// line: 261
@Override
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
...
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
...
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
...

租用连接方法,调用Future获取获取连接结果

1
2
3
4
5
6
7
8
9
10
11
// line: 300
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try {
entry = future.get(timeout, timeUnit);
...
return CPoolProxy.newProxy(entry);
...

获取连接

池化连接管理器中取连接
此处取到的是实现了HttpClientConnection接口的CPoolEntry代理类CPoolProxy
requestConnection 方法返回Future的匿名对象,其中实现了取连接过程
connRequest.get 去池中取连接,如果阻塞程序将卡在这一步
timeout 为配置参数connectionRequestTimeout默认-1不超时

MainClientExec::execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// line: 176
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
...
final HttpClientConnection managedConn;
...
// line: 190
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
...
// line: 233
if (!managedConn.isOpen()) {
this.log.debug("Opening connection " + route);
try {
establishRoute(proxyAuthState, managedConn, route, request, context);
...

建立连接

建立路由

用连接对象、路由对象、超时时间、上下文建立路由

MainClientExec::establishRoute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// line: 173
/**
* Establishes the target route.
*/
void establishRoute(
final AuthState proxyAuthState,
final HttpClientConnection managedConn,
final HttpRoute route,
final HttpRequest request,
final HttpClientContext context) throws HttpException, IOException {
...
case HttpRouteDirector.CONNECT_TARGET:
this.connManager.connect(
managedConn,
route,
timeout > 0 ? timeout : 0,
context);
...

打开连接

把上文抽象连接池内部连接工厂生产的连接池条目里再取出来

PoolingHttpClientConnectionManager::connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// line: 357
@Override
public void connect(
final HttpClientConnection managedConn,
final HttpRoute route,
final int connectTimeout,
final HttpContext context) throws IOException {
Args.notNull(managedConn, "Managed Connection");
Args.notNull(route, "HTTP route");
final ManagedHttpClientConnection conn;
synchronized (managedConn) {
final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
...
this.connectionOperator.connect(
conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
}

连接交给默认客户端连接操作做域名解析、绑定套接字

DefaultHttpClientConnectionOperator::connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// line: 98
@Override
public void connect(
final ManagedHttpClientConnection conn,
final HttpHost host,
final InetSocketAddress localAddress,
final int connectTimeout,
final SocketConfig socketConfig,
final HttpContext context) throws IOException {
...
final InetAddress[] addresses = host.getAddress() != null ?
new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());
...
Socket sock = sf.createSocket(context);
...
conn.bind(sock);

IO

发送

  • 客户端执行
  • 可关闭客户端执行
  • 内部客户端执行
  • 客户端主程序运行
  • 请求执行器运行
  • 连接刷数据
  • 输出会话缓冲刷数据
  • 输出套接字写数据、刷数据
1
2
3
4
5
6
7
8
9
httpClient.execute() ->
CloseableHttpClient.execute ->
InternalHttpClient.doExecute ->
MainClientExec.execute ->
HttpRequestExecutor.execute ->
CPoolProxy.sendRequestHeader ->
DefaultBHttpClientConnection.doFlush ->
SessionOutputBufferImpl.flush ->
SocketOutputStream.write/flush

接收

  • 输入套接字读取信息
  • 输入会话缓存读取流
  • 应答转换器转换数据
  • 连接获取应答头
  • 请求执行器获取应答
  • 客户端主程序获取应答
  • 内部客户端获取应答
  • 可关闭客户端获取应答
  • 客户端获取应答
1
2
3
4
5
6
7
8
9
SocketInputStream.socketRead ->
SessionInputBufferImpl.streamRead ->
DefaultHttpResponseParser.parseHead ->
CPoolProxy.receiveResponseHeader ->
HttpRequestExecutor.doReceiveResponse ->
MainClientExec.execute ->
InternalHttpClient.doExecute ->
CloseableHttpClient.execute ->
httpClient.execute()

释放连接

应答代理

检查应答实例,如果实例不存在或不是流则释放连接,HttpResponseProxy 用于代理释放连接
MainClientExec::execute

1
2
3
4
5
6
7
8
9
// line: 335
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
}
return new HttpResponseProxy(response, connHolder);

HttpResponseProxy::()

1
2
3
4
5
6
// line: 53
public HttpResponseProxy(final HttpResponse original, final ConnectionHolder connHolder) {
this.original = original;
this.connHolder = connHolder;
ResponseEntityProxy.enchance(original, connHolder);
}

ResponseEntityProxy

应答实体代理

处理答实体的端内事务

  • enchance 包装原始应答实体,生成持有连接的包装类
  • releaseConnection 用于释放连接
  • getContent 包装器方法
    EofSensorInputStream 会在取应答时被构建
    持有包装类,同时植入Eof观察者,以便在观察到Eof时触发eofDetected方法释放连接
  • eofDetected 观察者方法,用于检查Eof
  • streamClosed 观察者方法,用于关闭流

ResponseEntityProxy::enchance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// line: 50
public static void enchance(final HttpResponse response, final ConnectionHolder connHolder) {
final HttpEntity entity = response.getEntity();
if (entity != null && entity.isStreaming() && connHolder != null) {
response.setEntity(new ResponseEntityProxy(entity, connHolder));
}
}

// line: 74
public void releaseConnection() {
if (this.connHolder != null) {
this.connHolder.releaseConnection();
}
}

// line: 85
@Override
public InputStream getContent() throws IOException {
return new EofSensorInputStream(this.wrappedEntity.getContent(), this);
}

// line: 113
@Override
public boolean eofDetected(final InputStream wrapped) throws IOException {
try {
// there may be some cleanup required, such as
// reading trailers after the response body:
if (wrapped != null) {
wrapped.close();
}
releaseConnection();
...
}

// line: 134
@Override
public boolean streamClosed(final InputStream wrapped) throws IOException {
try {
final boolean open = connHolder != null && !connHolder.isReleased();
// this assumes that closing the stream will
// consume the remainder of the response body:
try {
if (wrapped != null) {
wrapped.close();
}
releaseConnection();
...

结尾感应输入流

  • EofSensorInputStream 构造方法
    观察者即ResponseEntityProxy
    输入流即ResponseEntityProxy持有的应答流包装类,用于关闭包装类内的原始流
    DecompressingEntity::() -> LazyDecompressingInputStream::() ->
    ResponseContentEncoding::process -> response.setEntity(DecompressingEntity) ->
    response.getEntity()
  • close 用于外部关闭流
  • checkClose 检查是否满足关闭条件

EofSensorInputStream::()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// line: 80
public EofSensorInputStream(final InputStream in,
final EofSensorWatcher watcher) {
Args.notNull(in, "Wrapped stream");
wrappedStream = in;
selfClosed = false;
eofWatcher = watcher;
}

// 168
@Override
public void close() throws IOException {
// tolerate multiple calls to close()
selfClosed = true;
checkClose();
}

// 221
protected void checkClose() throws IOException {

final InputStream toCloseStream = wrappedStream;
if (toCloseStream != null) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
scws = eofWatcher.streamClosed(toCloseStream);
}
...

取应答内容

取输入流entity.getContent()
会先调用ResponseEntityProxy 构建 Eof感知流、设置观察者
读取EofSensorInputStream时,检查是否触发Eof;读取结束后,关闭流同时释放连接

1
EntityUtils.toString(response.getEntity(), "UTF-8");

EntityUtils::toString

1
2
3
4
5
6
7
8
9
10
11
// line: 198
private static String toString(
final HttpEntity entity,
final ContentType contentType) throws IOException {
final InputStream inStream = entity.getContent();
...
while((l = reader.read(tmp)) != -1) {
...
} finally {
inStream.close();
}

释放过程

  • inStream.close() ->
  • EofSensorInputStream::close ->
  • EofSensorInputStream::checkClose ->
  • EofSensorWatcher::streamClosed ->
  • ResponseEntityProxy::streamClosed ->
  • ResponseEntityProxy::releaseConnection ->
  • ConnectionHolder::releaseConnection ->
  • CPoolProxy::close ->
  • CPoolEntry::close ->
  • BHttpConnectionBase::close ->
  • Socket::shutdownInput/shutdownOutput

案例

https://github.com/BurningBright/poc/tree/master/httpclient
设置请求配置参数:连接数1,连接池请求超时时间2s
注入字节码:将response.getEntity()
取到的DecompressingEntity包装实体,替换成的BufferedHttpEntity缓冲包装实体

-javaagent:/xxx/agent-1.0-SNAPSHOT-jar-with-dependencies.jar

1
2
3
4
org.apache.http.entity.BufferedHttpEntity newEntity = new org.apache.http.entity.BufferedHttpEntity(this.entity);
org.apache.http.HttpEntity originalEntity = this.entity;
this.entity = newEntity;
return originalEntity;

请求两次,分别观察两次请求后的取连接、释放连接的细节
第一次请求,正常从连接池取到连接后,发送请求、接收应答
在构造缓冲实体时数据从应答流读到缓存区,实体变为不可流,导致应答实体未被代理包装

ResponseEntityProxy::enchance

1
2
3
4
5
6
7
// line: 50
public static void enchance(final HttpResponse response, final ConnectionHolder connHolder) {
final HttpEntity entity = response.getEntity();
if (entity != null && entity.isStreaming() && connHolder != null) {
response.setEntity(new ResponseEntityProxy(entity, connHolder));
}
}

这也意味着第一次读取结束后,仅关闭了应答流
而未在代理的EofSensorInputStream中释放连接
第二次请求,将无法等到连接释放,抛出连接池超时异常

配置

套接字

属性 默认值 描述
soTimeout 0 非阻塞IO操作套接字超时时间
soReuseAddress false 仅用于广播套接字
soLinger -1 TCP RST 强制关闭连接超时时间(java7 NIO2)
soKeepAlive false 是否保持TCP套接字,用于防止对方主机崩溃而浪费链接资源
tcpNoDelay true 是否延迟发送上一个TCP请求
sndBufSize 0 说明底层发送缓存区的大小
rcvBufSize 0 说明底层接收缓存区的大小
backlogSize 0 最大积压连接请求

连接

属性 默认值 描述
expectContinueEnabled false 是否打开’Expect: 100-continue’ handshake
proxy null 请求代理:远端地址、端口、协议
localAddress null 用于多网络环境,区分请求源头
staleConnectionCheckEnabled false 是否启用旧连接请求检查
cookieSpec null 确定用于HTTP状态管理的cookie规格
redirectsEnabled true 是否自动处理重定向请求
relativeRedirectsAllowed true 是否允许相对重定向请求
circularRedirectsAllowed false 是否允许循环重定向请求
maxRedirects 50 对打重定向次数
authenticationEnabled true 是否自动处理认证请求
targetPreferredAuthSchemes null 确定目标主机身份验证方案优先顺序
proxyPreferredAuthSchemes null 确定目标代理主机身份验证方案优先顺序
connectionRequestTimeout -1 从连接管理器取连接的超时时间
connectTimeout -1 建立连接超时时间
socketTimeout -1 同SO_TIMEOUT
contentCompressionEnabled true 是否自动解压缩(已在4.5淘汰)
normalizeUri true 是否地址正常化(去除斜线?)

连接池

属性 默认值 描述
maxTotal 20 连接池最大连接数
defaultMaxPerRoute 2 每个路由的最大连接数
timeToLive -1 连接活跃时间
timeUnit MILLISECONDS 活跃时间单位
validateAfterInactivity 2000 连接变为不活跃后多久进行验证

httpcomponents-client-4.5.x
Apache Httpclient 连接池超时案例剖析
HttpClient 4.3连接池参数配置及源码解读
HttpClient连接池的一些思考