OkHttpRetrofit一样,同出一处,二者皆是极为优秀的开源框架,HTTP是现代应用程序网络的方式。

这就是我们交换数据和媒体的方式。有效地执行HTTP可以使您的内容加载更快并节省带宽。OkHttp是默认情况下有效的HTTP客户端:

  • HTTP / 2支持允许对同一主机的所有请求共享一个套接字。
  • 连接池可减少请求延迟(如果HTTP / 2不可用)。
  • 透明的GZIP缩小了下载大小。
  • 响应缓存可以完全避免网络重复请求。

  当网络出现问题时,OkHttp会坚持不懈:它将从常见的连接问题中静默恢复。如果您的服务具有多个IP地址,则在第一次连接失败时,OkHttp将尝试使用备用地址。这对于IPv4 + IPv6和冗余数据中心中托管的服务是必需的。OkHttp支持现代TLS功能(TLS 1.3,ALPN,证书固定)。可以将其配置为回退以获得广泛的连接性。使用上手容易简单。它的请求/响应API具有流畅的构建器和不变性。它支持同步阻塞调用和带有回调的异步调用。

一、 Retrofit与OkHttp的关系

  简单来说Retrofit是针对Java或Android基于OkHttp之上进一步的封装,Retrofit依赖于OkHttp,将网络请求的核心部分交给OkHttp执行,Retrofit使用注解方式,大大简化了我们的URL 拼写形式,而且注解含义一目了然,简单易懂; 支持多种文件格式解析(JSON,GSON,XML等); 实现强大的同步与异步执行,使得请求变得更易入手。

二、OkHttp核心源码解析

  在学习这些核心代码过程中,完全是以个人的角度及结合了一些官方文档、网络资料进行的,所以,有哪些地方解析有误,也望大家在评论中指出一二。言归正传,抛开OkHttp核心以外的方法,我们慢慢深入OkHttp网络请求核心的部分,其核心部分可以分三个点切入分析:

  • (Where)请求发送到哪里了
  • (Who)请求被谁处理了
  • (How) 请求是怎么维护的

  首先,来一个简单的网络请求的简单例子:

class MainActivity : AppCompatActivity() {
       override fun onCreate(savedInstanceState: Bundle?) {
          super.onCreate(savedInstanceState)

      //构建OkHttpClient
      val client = OkHttpClient.Builder().build()
      //构建Request
      val request = Request.Builder().url(Api.BASE_URL).build()
      //创建Call接口对象
      val call = client.newCall(request)
      //异步请求
      call.enqueue(object: Callback {
          //异步请求失败
          override fun onFailure(call: Call, e: IOException) {
              Log.e("FAIL", "请求失败---->${e.message}")
          }
    
          //异步请求成功
          override fun onResponse(call: Call, response: Response) {
              Log.e("Success", "请求数据---->${response.body}")
           }
       })
     }
}

  根据代码发现,通过call接口对象调用enqueue()入队的回调Callback实现的两个方法(onFailure和onResponse)中获取响应数据结果,所以核心的切入点是从call.enqueue(Callback回调)开始的。

1、(Where)请求发送到哪了

  在上面的简单的例子中,我们发现请求是被送入队列中的,通过call.enqueue(Callback回调)入队,进入enqueue方法我们发现:

interface Call : Cloneable {
  ...
  fun enqueue(responseCallback: Callback)
  ...
}

  enqueue其实是Call接口定义需要实现的入队方法,那Call接口会在那里被实现呢,进一步入代码中我们发现,这个接口在RealCall中被实现:

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  ...
  //请求进入队列
  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
  ...
}

  不难发现请求发送到了Dispatcher中的入队方法中被处理了,进一步进入源码:

class Dispatcher constructor() {
 ...
  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
    //所有请求先添加到等待异步回调队列中
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //执行与促进
    promoteAndExecute()
  }
 ...
}  

  首先在源码中我们看到了 readyAsyncCalls.add(call),这个readyAsyncCalls表示等待异步回调队列(其实就是空数组双端队列,初始容纳16个元素):

 private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  所有请求进入队直接就添加到readyAsyncCalls这个队列中等待,他的执行时机就是靠 promoteAndExecute()这个方法:

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

  在这方法中涉及到两个很重要的队列,一个是上面提到的等待异步回调队列readyAsyncCalls(下面以等待队列相称),另外一个是运行中异步回调队列runningAsyncCalls(下面以运行队列相称),在等待队列循环执行 while (i.hasNext())条件下,满足请求被添加到运行队列的必要条件是:运行队列中请求数必须小于maxRequests(64个)且每个主机请求数必须小于maxRequestsPerHost(5个),当这些都不满足的情况下,请求会一直呆在等待队列中,直到运行队列中的请求逐个处理完成,然后等待队列中的请求也逐个被添加运行队列中执行。

  所以根据源码,我们可总结出请求到底去哪了:所有的请求现进入了等待队列readyAsyncCalls,其次,运行队列runningAsyncCalls请求的个数小于64个且运行队列主机执行个数小于5个时,则请求会从等待队列readyAsyncCalls移出,放到运行队列runningAsyncCalls中执行,一按照这个条件直循环执行下去。

2、(Who)请求被谁处理了

  我们前面结论的出请求被放到哪里执行了,那么我们现在来说说队列到底被谁执行了,上面的一句关键源码就是进入被谁处理的大门:

 asyncCall.executeOn(executorService)

  当请求在运行中队列时,将会在被添加到线程池中执行:

  /**
  * ThreadPoolExecutor的参数
  * corePoolSize:保留在线程池中的核心线程数
  * maximumPoolSize:一个线程池中允许最大线程数(这里是Int.MAX_VALUE)
  * keepAliveTime:等待时长,当线程数大于CPU内核数时,是多余空闲线程终止之前等待新任务被执行的最大时长
  * unit:时间单位
  * workQueue:用于执行任务之前保留任务的队列
  * threadFactory:线程池工厂,用于创建线程,管理线程,避免频繁new Thread的工作
  */
  private var executorServiceOrNull: ExecutorService? = null
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  而执行这个线程池的方法是在RealCall 类中的的内部类AsyncCall的executeOn,AsyncCall同时中开启了一个线程threadName("OkHttp ${redactedUrl()}")执行请求:

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {

...

  internal inner class AsyncCall(
     private val responseCallback: Callback
   ) : Runnable {
     @Volatile var callsPerHost = AtomicInteger(0)
       private set

     fun reuseCallsPerHostFrom(other: AsyncCall) {
       this.callsPerHost = other.callsPerHost
     }

     val host: String
       get() = originalRequest.url.host
 
     val request: Request
         get() = originalRequest

     val call: RealCall
        get() = this@RealCall

     /**
      * Attempt to enqueue this async call on [executorService]. This will attempt to  clean up
      * if the executor has been shut down by reporting the call as failed.
      */
     fun executeOn(executorService: ExecutorService) {
       client.dispatcher.assertThreadDoesntHoldLock()

       var success = false
       try {
         executorService.execute(this)
         success = true
       } catch (e: RejectedExecutionException) {
         val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
         }
       }
     }
   }
  ...
}

  在这一大段源码中我们发现了的响应response通过getResponseWithInterceptorChain()获取,这方法也是真正帮我们处理请求的地方:


  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

  改源码内部通过拦截器责任链来处理请求,这里涉及到了责任链中多个拦截器:

  • 自定义拦截器(client.interceptors):是通过addInterceptor()方法添加的自定义拦截器是一个不可变的拦截器
  • 重定向拦截器(RetryAndFollowUpInterceptor):从故障中恢复,并按需要重定向
  • 桥接拦截器(BridgeInterceptor):从应用程序代码到网络代码的桥梁,构建网络–>呼叫网络–>构+ 建网络响应
  • 缓存拦截器(CacheInterceptor):服务来自缓存的请求,并将响应写入缓存
  • 连接拦截器(ConnectInterceptor):打开与目标服务器的连接,然后进入下一个拦截器
  • 自定义拦截器(client.networkInterceptors):此拦截器通过addNetworkInterceptor()方法添加进来的自定义拦截器,是一个不可变的拦截器
  • 读写拦截器 (CallServerInterceptor):对服务器网络进行呼叫,对响应体执行读写操作

当以上多个拦截器都满足条件,最终则可以获取响应结果。 所以可总结出请求是被谁执行的:在运行队列中,请求会在线程池中开启一个threadName("OkHttp ${redactedUrl()}")的线程中执行,最终会通过拦截器责任链getResponseWithInterceptorChain中一步步被执行,获取响应结果。

3、(How)请求是怎样维护的

  通过前面,我们已经知道了请求发送到哪里和具体被哪个核心代码执行,那么请求是怎么被维护的呢,回到我们上面的代码中,我们会发现获取到响应结果后,最终会有这么一句代码会被执行:

 client.dispatcher.finished(this)

进入finished(xxx)会有以下源码:

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

  这里我们会发现有一个熟悉的代码val isRunning = promoteAndExecute(),没错,就是请求循环回到队列中,执行着我们第2小节讲的请求发送到哪的那个节点里。实现了队列循环添加处理、等待请求的维护体系。

三、总结

  这篇文章旨在自己自学Android的心得,因为某些因素,自己已经停岗几年没做开发了,重新学习发现自己年纪也长了不少,新技术也迭代更新不少,或许无法和年轻小伙比,但学习永无止境,希望这篇浅析源码能给一些需要的人帮助吧。