
Okio is a library that complements java.io and java.nio to make it much easier to access, store, and process your data.

正如Okio官网所说,它整合了java io 和nio让它们的更容易使用。此篇深入分析一下Okio高效的原因。

implementation "com.squareup.okio:okio:2.7.0"


// 读取文件
val readFile = File("read")
val buffer = readFile.source().buffer()

// 写入文件
val writableFile = File("writable")
if (!writableFile.exists()) {
val sink = writableFile.sink().buffer()
sink.writeString("sink write in", Charset.defaultCharset())






internal class Segment {
  // 用于存储读/写数据 长度8192
  @JvmField val data: ByteArray

  /** The next byte of application data byte to read in this segment.  */
  // 当前byteArray的读取位置
  @JvmField var pos: Int = 0

  /** The first byte of available data ready to be written to.  */
  // 可以理解为data当前的容量
  @JvmField var limit: Int = 0

  /** True if other segments or byte strings use the same byte array.  */
  // 与下面的owner互斥表示这个segment是共享的还是独占的
  // 可能会影响到我们写入数据时是否需要创建新的segment
  @JvmField var shared: Boolean = false

  /** True if this segment owns the byte array and can append to it, extending `limit`.  */
  @JvmField var owner: Boolean = false

  /** Next segment in a linked or circularly-linked list.  */
  @JvmField var next: Segment? = null

  /** Previous segment in a circularly-linked list.  */
  @JvmField var prev: Segment? = null

 // 省略...



internal inline fun Buffer.commonReadInt(): Int {
  if (size < 4L) throw EOFException()

  // 从Buffer中获取head
  val segment = head!!
  // pos 读取的起始位置
  var pos = segment.pos
  // segment中字节码的长度
  val limit = segment.limit

  // If the int is split across multiple segments, delegate to readByte().
  // 发现剩余的字节码的长度小于4,等于说剩余的长度不足一个int的长度啦,就直接去读byte吧
  if (limit - pos < 4L) {
    // shl 运算优先级要大于and
    return (readByte() and 0xff shl 24
      or (readByte() and 0xff shl 16)
      or (readByte() and 0xff shl 8) // ktlint-disable no-multi-spaces
      or (readByte() and 0xff))

  val data = segment.data
  // 这里呢,在字节码数组中读取4位表示一个Int,第一个字节表示高8位,最后一个字节低8位,将这32位bit求和就得到读取的int值啦
  val i = (data[pos++] and 0xff shl 24
    or (data[pos++] and 0xff shl 16)
    or (data[pos++] and 0xff shl 8)
    or (data[pos++] and 0xff))
  // 读取了4个字节,长度减4咯
  size -= 4L

  // 当当前读取的位置与存储的上限位置相等表示读完啦
  if (pos == limit) {
    // 改变头部的指向
    head = segment.pop()
    // 将读取完的segment回收
  } else {
    // 改变当前读取到的位置
    segment.pos = pos

  // 返回读取的int值
  return i

internal inline fun Buffer.commonWriteInt(i: Int): Buffer {
  // 拿到一个可写入的segment,4表示int的长度,影响后面是否要创建新的segment
  val tail = writableSegment(4)
  val data = tail.data
  // 下面就是常规操作啦,写入一个int值并更新字节数组的长度和size的大小
  var limit = tail.limit
  data[limit++] = (i ushr 24 and 0xff).toByte()
  data[limit++] = (i ushr 16 and 0xff).toByte()
  data[limit++] = (i ushr  8 and 0xff).toByte() // ktlint-disable no-multi-spaces
  data[limit++] = (i         and 0xff).toByte() // ktlint-disable no-multi-spaces
  tail.limit = limit
  size += 4L
  return this


internal actual object SegmentPool {
  actual val MAX_SIZE = 64 * 1024

  private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false)

  // hash_bucket_count与处理器ALU个数有关,4舍5入保证是2的指数。这样能保证线程之间抢占cpu的可能性更低,我们创建线程池是不是也可以这么设计呢?
  private val HASH_BUCKET_COUNT =
    Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1)

   * Hash buckets each containing a singly-linked list of segments. We use multiple hash buckets so
   * different threads don't race each other. We use thread IDs as hash keys because they're handy,
   * and because it may increase locality.
   * We don't use [ThreadLocal] because we don't know how many threads the host process has and we
   * don't want to leak memory for the duration of a thread's life.
   // 对于io密集型场景,线程数 = cpu核心数 / (1 - 阻塞系数) (阻塞系数为该任务阻塞时间与(阻塞时间+计算时间)的比值)。可以简单设置为2倍cpu核心数
   // 对于计算密集型场景,线程数 = cpu核心数
   // hashBuckets的长度>=可用处理器长度,一般情况是一个线程对应一个Segment,但是也可能存在多个线程对应一个Segment的情况。
  private val hashBuckets: Array<AtomicReference<Segment?>> = Array(HASH_BUCKET_COUNT) {

  actual fun take(): Segment {
    val firstRef = firstRef()

    // firstRef获取值并设置LOCK返回原值
    val first = firstRef.getAndSet(LOCK)
    when {
      // 如果first为LOCK,没有占有锁,不会从池中拿segment
      first === LOCK -> {
        // We didn't acquire the lock. Don't take a pooled segment.
        return Segment()
      // first == null当前占有锁但是segment池是空的。释放锁返回一个segment
      first == null -> {
        // We acquired the lock but the pool was empty. Unlock and return a new segment.
        return Segment()
      // 其它情况,获取到了锁并且segment pool不是空的。复用当前的Segment。
      else -> {
        // We acquired the lock and the pool was not empty. Pop the first element and return it.
        first.next = null
        first.limit = 0
        return first

  actual fun recycle(segment: Segment) {
    require(segment.next == null && segment.prev == null)
    // 如果Segment是共享的不能被回收
    if (segment.shared) return // This segment cannot be recycled.

    val firstRef = firstRef()

    val first = firstRef.get()
    // 若当前锁被占用,不能被回收
    if (first === LOCK) return // A take() is currently in progress.
    val firstLimit = first?.limit ?: 0
    if (firstLimit >= MAX_SIZE) return // Pool is full.

    segment.next = first
    segment.pos = 0
    // 更新缓存池的容量(头插法)
    segment.limit = firstLimit + Segment.SIZE

    // 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
    // 如果成功,则返回 true。返回 false 指示实际值与预期值不相等。
    if (!firstRef.compareAndSet(first, segment)) segment.next = null
    // If we raced another operation: Don't recycle this segment.

  private fun firstRef(): AtomicReference<Segment?> {
    // Get a value in [0..HASH_BUCKET_COUNT).
    val hashBucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt()
    return hashBuckets[hashBucket]




fun main() {
    // 初始化一个文件输出流
    val ops = FileOutputStream("writable")
    val sink = ops.sink()
    val timeout = sink.timeout()
    // 设置deadline为5秒
    timeout.deadline(5, TimeUnit.SECONDS)
    // 当前线程睡眠6s
    val buffer = sink.buffer()
    // 执行写入操作
    buffer.writeString("after 10 second", Charset.defaultCharset())


Exception in thread "main" java.io.InterruptedIOException: deadline reached
  at okio.Timeout.throwIfReached(Timeout.kt:102)
  at okio.OutputStreamSink.write(JvmOkio.kt:50)
  at okio.RealBufferedSink.close(RealBufferedSink.kt:260)
  at MainKt.main(Main.kt:21)
  at MainKt.main(Main.kt)


open fun throwIfReached() {
  if (Thread.interrupted()) {
    Thread.currentThread().interrupt() // Retain interrupted status.
    throw InterruptedIOException("interrupted")
  // deadline time 与当前系统时间的差值小于0,抛出异常
  if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
    throw InterruptedIOException("deadline reached")


// SocketAsyncTimeout是AsyncTimeout子类,实际上是用它控制timeout的
fun Socket.sink(): Sink {
  val timeout = SocketAsyncTimeout(this)
  val sink = OutputStreamSink(getOutputStream(), timeout)
  return timeout.sink(sink)

fun AsyncTimeout.source(source: Source): Source {
  return object : Source {
    override fun read(sink: Buffer, byteCount: Long): Long {
      // source.read被withTimeout包装了一层
      return withTimeout { source.read(sink, byteCount) }

    override fun close() {
      withTimeout { source.close() }

    override fun timeout() = this@AsyncTimeout

    override fun toString() = "AsyncTimeout.source($source)"

inline fun <T> withTimeout(block: () -> T): T {
  var throwOnTimeout = false
  // 超时逻辑所在
  // 省略。。。

fun enter() {
  check(!inQueue) { "Unbalanced enter/exit" }
  val timeoutNanos = timeoutNanos()
  val hasDeadline = hasDeadline()
  if (timeoutNanos == 0L && !hasDeadline) {
    return // No timeout and no deadline? Don't bother with the queue.
  inQueue = true
  // 调度timeout,开启watchdog线程,watchdog会计算当前timeout节点是否超时若达到了超时时间将timeout从当前链表中移除并返回执行timeout.timeout(),当没有节点时watchdog自动挂起1分钟。
  scheduleTimeout(this, timeoutNanos, hasDeadline)



