OkHttp3源码解析(三)——连接池复用

okhttp,红米 拍照 崩溃,db,全屏无边框 ,www.515158.com

OkHttp3源码解析(三)——连接池复用

OKHttp3源码解析系列

本文基于OkHttp3的3.11.0版本

implementation com.squareup.okhttp3:okhttp:3.11.0

我们已经分析了OkHttp3的拦截器链和缓存策略,今天我们再来看看OkHttp3的连接池复用。

客户端和服务器建立socket连接需要经历TCP的三次握手和四次挥手,是一种比较消耗资源的动作。Http中有一种keepAlive connections的机制,在和客户端通信结束以后可以保持连接指定的时间。OkHttp3支持5个并发socket连接,默认的keepAlive时间为5分钟。下面我们来看看OkHttp3是怎么实现连接池复用的。

OkHttp3的连接池--ConnectionPool

public final class ConnectionPool {
    
    //线程池,用于执行清理空闲连接
    private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    //最大的空闲socket连接数
    private final int maxIdleConnections;
    //socket的keepAlive时间
    private final long keepAliveDurationNs;
    
    private final Deque<RealConnection> connections = new ArrayDeque<>();
    final RouteDatabase routeDatabase = new RouteDatabase();
    boolean cleanupRunning;
}

ConnectionPool里的几个重要变量:

(1)executor线程池,类似于CachedThreadPool,用于执行清理空闲连接的任务。

(2)Deque双向队列,同时具有队列和栈的性质,经常在缓存中被使用,里面维护的RealConnection是socket物理连接的包装

(3)RouteDatabase,用来记录连接失败的路线名单

下面看看ConnectionPool的构造函数

public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
}

从构造函数中可以看出,ConnectionPool的默认空闲连接数为5个,keepAlive时间为5分钟。ConnectionPool是什么时候被创建的呢?是在OkHttpClient的builder中:

public static final class Builder {
    ...
    ConnectionPool connectionPool;
    ...
    public Builder() {
        ...
        connectionPool = new ConnectionPool();
        ...
    }
    
    //我们也可以定制连接池
    public Builder connectionPool(ConnectionPool connectionPool) {
        if (connectionPool == null) throw new NullPointerException("connectionPool == null");
        this.connectionPool = connectionPool;
        return this;
    }
}

缓存操作:添加、获取、回收连接

(1)从缓存中获取连接

//ConnectionPool.class
@Nullable 
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
}

获取连接的逻辑比较简单,就遍历连接池里的连接connections,然后用RealConnection的isEligible方法找到符合条件的连接,如果有符合条件的连接则复用。需要注意的是,这里还调用了streamAllocation的acquire方法。acquire方法的作用是对RealConnection引用的streamAllocation进行计数,OkHttp3是通过RealConnection的StreamAllocation的引用计数是否为0来实现自动回收连接的。

//StreamAllocation.class
public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}

public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {

    public final Object callStackTrace;

    StreamAllocationReference(StreamAllocation referent, Object callStackTrace) {
      super(referent);
      this.callStackTrace = callStackTrace;
    }
}
//RealConnection.class
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

每一个RealConnection中都有一个allocations变量,用于记录对于StreamAllocation的引用。StreamAllocation中包装有HttpCodec,而HttpCodec里面封装有Request和Response读写Socket的抽象。每一个请求Request通过Http来请求数据时都需要通过StreamAllocation来获取HttpCodec,从而读取响应结果,而每一个StreamAllocation都是和一个RealConnection绑定的,因为只有通过RealConnection才能建立socket连接。所以StreamAllocation可以说是RealConnection、HttpCodec和请求之间的桥梁。

当然同样的StreamAllocation还有一个release方法,用于移除计数,也就是将当前的StreamAllocation的引用从对应的RealConnection的引用列表中移除。

private void release(RealConnection connection) {
    for (int i = 0, size = connection.allocations.size(); i < size; i++) {
      Reference<StreamAllocation> reference = connection.allocations.get(i);
      if (reference.get() == this) {
        connection.allocations.remove(i);
        return;
      }
    }
    throw new IllegalStateException();
}

(2)向缓存中添加连接

//ConnectionPool.class
void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

添加连接之前会先调用线程池执行清理空闲连接的任务,也就是回收空闲的连接。

(3)空闲连接的回收

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
};

cleanupRunnable中执行清理任务是通过cleanup方法来完成,cleanup方法会返回下次需要清理的间隔时间,然后会调用wait方法释放锁和时间片。等时间到了就再次进行清理。下面看看具体的清理逻辑:

long cleanup(long now) {
    //记录活跃的连接数
    int inUseConnectionCount = 0;
    //记录空闲的连接数
    int idleConnectionCount = 0;
    //空闲时间最长的连接
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //判断连接是否在使用,也就是通过StreamAllocation的引用计数来判断
        //返回值大于0说明正在被使用
        if (pruneAndGetAllocationCount(connection, now) > 0) {
            //活跃的连接数+1
            inUseConnectionCount++;
            continue;
        }
        //说明是空闲连接,所以空闲连接数+1
        idleConnectionCount++;

        //找出了空闲时间最长的连接,准备移除
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        //如果空闲时间最长的连接的空闲时间超过了5分钟
        //或是空闲的连接数超过了限制,就移除
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        //如果存在空闲连接但是还没有超过5分钟
        //就返回剩下的时间,便于下次进行清理
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //如果没有空闲的连接,那就等5分钟后再尝试清理
        return keepAliveDurationNs;
      } else {
        //当前没有任何连接,就返回-1,跳出循环
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
}

下面我们看看判断连接是否是活跃连接的pruneAndGetAllocationCount方法

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
        Reference<StreamAllocation> reference = references.get(i);
    
        //如果存在引用,就说明是活跃连接,就继续看下一个StreamAllocation
        if (reference.get() != null) {
            i++;
            continue;
        }

      // Weve discovered a leaked allocation. This is an application bug.
      //发现泄漏的引用,会打印日志
        StreamAllocation.StreamAllocationReference streamAllocRef =
            (StreamAllocation.StreamAllocationReference) reference;
        String message = "A connection to " + connection.route().address().url()
            + " was leaked. Did you forget to close a response body?";
        Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
        
        //如果没有引用,就移除
        references.remove(i);
        connection.noNewStreams = true;

        //如果列表为空,就说明此连接上没有StreamAllocation引用了,就返回0,表示是空闲的连接
        if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
        }
    }
    //遍历结束后,返回引用的数量,说明当前连接是活跃连接
    return references.size();
}

至此我们就分析完OkHttp3的连接池复用了。

总结

(1)OkHttp3中支持5个并发socket连接,默认的keepAlive时间为5分钟,当然我们可以在构建OkHttpClient时设置不同的值。

(2)OkHttp3通过Deque来存储连接,通过put、get等操作来管理连接。

(3)OkHttp3通过每个连接的引用计数对象StreamAllocation的计数来回收空闲的连接,向连接池添加新的连接时会触发执行清理空闲连接的任务。清理空闲连接的任务通过线程池来执行。

OKHttp3源码解析系列



欢迎关注我的微信公众号,和我一起每天进步一点点!
AntDream
OA  OA软件  OA系统  OA办公系统  协同OA软件  OA办公软件  开源OA  协同OA  PHPOA  oa  企业信用查询 cookie  async属性  soso地图api  text/html  电脑信息  js拦截  IBM vpn  universe  日期转换  儒略历转换  类与类数值传递  web服务端 端口监听 c  观察者模式java  java8观察者模式  弹弹球  mybatis学习笔记  mybatis关联关系  t  #cell 在编辑模式下  C# 通讯  转账记录  #根节点  ffmpeg    视频转换   扫描条形码  我11  每隔5秒请求一次数据  泡泡  地图标注  at24c02  at91 arm7  at89c51  at org apache  at javax servlet  kernel segfault at  Fluent  单点登录服务器  uploadify上传文件  Core邮件发送  SMTP邮件发送  刷机忘取sim  nfc sim  sim卡应用  sim800  日文  ruby类  新手教程  消息服务器  re模块  ts不连续  scriptx 连续打印