你好,我是鸟窝。

在前面的四节课中,我们学习了第一个同步原语,即Mutex,我们使用它来保证读写共享资源的安全性。不管是读还是写,我们都通过Mutex来保证只有一个goroutine访问共享资源,这在某些情况下有点“浪费”。比如说,在写少读多的情况下,即使一段时间内没有写操作,大量并发的读访问也不得不在Mutex的保护下变成了串行访问,这个时候,使用Mutex,对性能的影响就比较大。

怎么办呢?你是不是已经有思路了,对,就是区分读写操作。

我来具体解释一下。如果某个读操作的goroutine持有了锁,在这种情况下,其它读操作的goroutine就不必一直傻傻地等待了,而是可以并发地访问共享变量,这样我们就可以将串行的读变成并行读,提高读操作的性能。当写操作的goroutine持有锁的时候,它就是一个排外锁,其它的写操作和读操作的goroutine,需要阻塞等待持有这个锁的goroutine释放锁。

这一类并发读写问题叫作readers-writers问题,意思就是,同时可能有多个读或者多个写,但是只要有一个线程在执行写操作,其它的线程都不能执行读写操作。

Go标准库中的RWMutex(读写锁)就是用来解决这类readers-writers问题的。所以,这节课,我们就一起来学习RWMutex。我会给你介绍读写锁的使用场景、实现原理以及容易掉入的坑,你一定要记住这些陷阱,避免在实际的开发中犯相同的错误。

什么是RWMutex?

我先简单解释一下读写锁RWMutex。标准库中的RWMutex是一个 reader/writer 互斥锁。RWMutex在某一时刻只能由任意数量的reader持有,或者是只被单个的writer持有。

RWMutex的方法也很少,总共有5个。

RWMutex的零值是未加锁的状态,所以,当你使用RWMutex的时候,无论是声明变量,还是嵌入到其它struct中,都不必显式地初始化。

我以计数器为例,来说明一下,如何使用RWMutex保护共享资源。计数器的count++操作是操作,而获取count的值是操作,这个场景非常适合读写锁,因为读操作可以并行执行,写操作时只允许一个线程执行,这正是readers-writers问题。

在这个例子中,使用10个goroutine进行读操作,每读取一次,sleep 1毫秒,同时,还有一个gorotine进行写操作,每一秒写一次,这是一个 1 writer-n reader 的读写场景,而且写操作还不是很频繁(一秒一次):

func main() {
    var counter Counter
    for i := 0; i < 10; i++ { // 10个reader
        go func() {
            for {
                counter.Count() // 计数器读操作
                time.Sleep(time.Millisecond)
            }
        }()
    }

    for { // 一个writer
        counter.Incr() // 计数器写操作
        time.Sleep(time.Second)
    }
}
// 一个线程安全的计数器
type Counter struct {
    mu    sync.RWMutex
    count uint64
}

// 使用写锁保护
func (c *Counter) Incr() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// 使用读锁保护
func (c *Counter) Count() uint64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.count
}

可以看到,Incr方法会修改计数器的值,是一个写操作,我们使用Lock/Unlock进行保护。Count方法会读取当前计数器的值,是一个读操作,我们使用RLock/RUnlock方法进行保护。

Incr方法每秒才调用一次,所以,writer竞争锁的频次是比较低的,而10个goroutine每毫秒都要执行一次查询,通过读写锁,可以极大提升计数器的性能,因为在读取的时候,可以并发进行。如果使用Mutex,性能就不会像读写锁这么好。因为多个reader并发读的时候,使用互斥锁导致了reader要排队读的情况,没有RWMutex并发读的性能好。

如果你遇到可以明确区分reader和writer goroutine的场景,且有大量的并发读、少量的并发写,并且有强烈的性能需求,你就可以考虑使用读写锁RWMutex替换Mutex。

在实际使用RWMutex的时候,如果我们在struct中使用RWMutex保护某个字段,一般会把它和这个字段放在一起,用来指示两个字段是一组字段。除此之外,我们还可以采用匿名字段的方式嵌入struct,这样,在使用这个struct时,我们就可以直接调用Lock/Unlock、RLock/RUnlock方法了,这和我们前面在01讲中介绍Mutex的使用方法很类似,你可以回去复习一下。

RWMutex的实现原理

RWMutex是很常见的并发原语,很多编程语言的库都提供了类似的并发类型。RWMutex一般都是基于互斥锁、条件变量(condition variables)或者信号量(semaphores)等并发原语来实现。Go标准库中的RWMutex是基于Mutex实现的。

readers-writers问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类。

Go标准库中的RWMutex设计是Write-preferring方案。一个正在阻塞的Lock调用会排除新的reader请求到锁。

RWMutex包含一个Mutex,以及四个辅助字段writerSem、readerSem、readerCount和readerWait:

type RWMutex struct {
	w           Mutex   // 互斥锁解决多个writer的竞争
	writerSem   uint32  // writer信号量
	readerSem   uint32  // reader信号量
	readerCount int32   // reader的数量
	readerWait  int32   // writer等待完成的reader的数量
}

const rwmutexMaxReaders = 1 << 30

我来简单解释一下这几个字段。

这里的常量rwmutexMaxReaders,定义了最大的reader数量。

好了,知道了RWMutex的设计方案和具体字段,下面我来解释一下具体的方法实现。

RLock/RUnlock的实现

首先,我们看一下移除了race等无关紧要的代码后的RLock和RUnlock方法:

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
            // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r) // 有等待的writer
    }
}
func (rw *RWMutex) rUnlockSlow(r int32) {
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 最后一个reader了,writer终于有机会获得锁了
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

第2行是对reader计数加1。你可能比较困惑的是,readerCount怎么还可能为负数呢?其实,这是因为,readerCount这个字段有双重含义:

调用RUnlock的时候,我们需要将Reader的计数减去1(第8行),因为reader的数量减少了一个。但是,第8行的AddInt32的返回值还有另外一个含义。如果它是负值,就表示当前有writer竞争锁,在这种情况下,还会调用rUnlockSlow方法,检查是不是reader都释放读锁了,如果读锁都释放了,那么可以唤醒请求写锁的writer了。

当一个或者多个reader持有锁的时候,竞争锁的writer会等待这些reader释放完,才可能持有这把锁。打个比方,在房地产行业中有条规矩叫做“买卖不破租赁”,意思是说,就算房东把房子卖了,新业主也不能把当前的租户赶走,而是要等到租约结束后,才能接管房子。这和RWMutex的设计是一样的。当writer请求锁的时候,是无法改变既有的reader持有锁的现实的,也不会强制这些reader释放锁,它的优先权只是限定后来的reader不要和它抢。

所以,rUnlockSlow将持有锁的reader计数减少1的时候,会检查既有的reader是不是都已经释放了锁,如果都释放了锁,就会唤醒writer,让writer持有锁。

Lock

RWMutex是一个多writer多reader的读写锁,所以同时可能有多个writer和reader。那么,为了避免writer之间的竞争,RWMutex就会使用一个Mutex来保证writer的互斥。

一旦一个writer获得了内部的互斥锁,就会反转readerCount字段,把它从原来的正整数readerCount(>=0)修改为负数(readerCount-rwmutexMaxReaders),让这个字段保持两个含义(既保存了reader的数量,又表示当前有writer)。

我们来看下下面的代码。第5行,还会记录当前活跃的reader数量,所谓活跃的reader,就是指持有读锁还没有释放的那些reader。

func (rw *RWMutex) Lock() {
    // 首先解决其他writer竞争问题
    rw.w.Lock()
    // 反转readerCount,告诉reader有writer竞争锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 如果当前有reader持有锁,那么需要等待
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

如果readerCount不是0,就说明当前有持有读锁的reader,RWMutex需要把这个当前readerCount赋值给readerWait字段保存下来(第7行), 同时,这个writer进入阻塞等待状态(第8行)。

每当一个reader释放读锁的时候(调用RUnlock方法时),readerWait字段就减1,直到所有的活跃的reader都释放了读锁,才会唤醒这个writer。

Unlock

当一个writer释放锁的时候,它会再次反转readerCount字段。可以肯定的是,因为当前锁由writer持有,所以,readerCount字段是反转过的,并且减去了rwmutexMaxReaders这个常数,变成了负数。所以,这里的反转方法就是给它增加rwmutexMaxReaders这个常数值。

既然writer要释放锁了,那么就需要唤醒之后新来的reader,不必再阻塞它们了,让它们开开心心地继续执行就好了。

在RWMutex的Unlock返回之前,需要把内部的互斥锁释放。释放完毕后,其他的writer才可以继续竞争这把锁。

func (rw *RWMutex) Unlock() {
    // 告诉reader没有活跃的writer了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    
    // 唤醒阻塞的reader们
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 释放内部的互斥锁
    rw.w.Unlock()
}

在这段代码中,我删除了race的处理和异常情况的检查,总体看来还是比较简单的。这里有几个重点,我要再提醒你一下。首先,你要理解readerCount这个字段的含义以及反转方式。其次,你还要注意字段的更改和内部互斥锁的顺序关系。在Lock方法中,是先获取内部互斥锁,才会修改的其他字段;而在Unlock方法中,是先修改的其他字段,才会释放内部互斥锁,这样才能保证字段的修改也受到互斥锁的保护。

好了,到这里我们就完整学习了RWMutex的概念和实现原理。RWMutex的应用场景非常明确,就是解决readers-writers问题。学完了今天的内容,之后当你遇到这类问题时,要优先想到RWMutex。另外,Go并发原语代码实现的质量都很高,非常精炼和高效,所以,你可以通过看它们的实现原理,学习一些编程的技巧。当然,还有非常重要的一点就是要知道reader或者writer请求锁的时候,既有的reader/writer和后续请求锁的reader/writer之间的(释放锁/请求锁)顺序关系。

有个很有意思的事儿,就是官方的文档对RWMutex介绍是错误的,或者说是不明确的,在下一个版本(Go 1.16)中,官方会更改对RWMutex的介绍,具体是这样的:

A RWMutex is a reader/writer mutual exclusion lock.

The lock can be held by any number of readers or a single writer, and

a blocked writer also blocks new readers from acquiring the lock.

这个描述是相当精确的,它指出了RWMutex可以被谁持有,以及writer比后续的reader有获取锁的优先级。

虽然RWMutex暴露的API也很简单,使用起来也没有复杂的逻辑,但是和Mutex一样,在实际使用的时候,也会很容易踩到一些坑。接下来,我给你重点介绍3个常见的踩坑点。

RWMutex的3个踩坑点

坑点1:不可复制

前面刚刚说过,RWMutex是由一个互斥锁和四个辅助字段组成的。我们很容易想到,互斥锁是不可复制的,再加上四个有状态的字段,RWMutex就更加不能复制使用了。

不能复制的原因和互斥锁一样。一旦读写锁被使用,它的字段就会记录它当前的一些状态。这个时候你去复制这把锁,就会把它的状态也给复制过来。但是,原来的锁在释放的时候,并不会修改你复制出来的这个读写锁,这就会导致复制出来的读写锁的状态不对,可能永远无法释放锁。

那该怎么办呢?其实,解决方案也和互斥锁一样。你可以借助vet工具,在变量赋值、函数传参、函数返回值、遍历数据、struct初始化等时,检查是否有读写锁隐式复制的情景。

坑点2:重入导致死锁

读写锁因为重入(或递归调用)导致死锁的情况更多。

我先介绍第一种情况。因为读写锁内部基于互斥锁实现对writer的并发访问,而互斥锁本身是有重入问题的,所以,writer重入调用Lock的时候,就会出现死锁的现象,这个问题,我们在学习互斥锁的时候已经了解过了。

func foo(l *sync.RWMutex) {
    fmt.Println("in foo")
    l.Lock()
    bar(l)
    l.Unlock()
}

func bar(l *sync.RWMutex) {
    l.Lock()
    fmt.Println("in bar")
    l.Unlock()
}

func main() {
    l := &sync.RWMutex{}
    foo(l)
}

运行这个程序,你就会得到死锁的错误输出,在Go运行的时候,很容易就能检测出来。

第二种死锁的场景有点隐蔽。我们知道,有活跃reader的时候,writer会等待,如果我们在reader的读操作时调用writer的写操作(它会调用Lock方法),那么,这个reader和writer就会形成互相依赖的死锁状态。Reader想等待writer完成后再释放锁,而writer需要这个reader释放锁之后,才能不阻塞地继续执行。这是一个读写锁常见的死锁场景。

第三种死锁的场景更加隐蔽。

当一个writer请求锁的时候,如果已经有一些活跃的reader,它会等待这些活跃的reader完成,才有可能获取到锁,但是,如果之后活跃的reader再依赖新的reader的话,这些新的reader就会等待writer释放锁之后才能继续执行,这就形成了一个环形依赖: writer依赖活跃的reader -> 活跃的reader依赖新来的reader -> 新来的reader依赖writer

这个死锁相当隐蔽,原因在于它和RWMutex的设计和实现有关。啥意思呢?我们来看一个计算阶乘(n!)的例子:

func main() {
    var mu sync.RWMutex

    // writer,稍微等待,然后制造一个调用Lock的场景
    go func() {
        time.Sleep(200 * time.Millisecond)
        mu.Lock()
        fmt.Println("Lock")
        time.Sleep(100 * time.Millisecond)
        mu.Unlock()
        fmt.Println("Unlock")
    }()

    go func() {
        factorial(&mu, 10) // 计算10的阶乘, 10!
    }()
    
    select {}
}

// 递归调用计算阶乘
func factorial(m *sync.RWMutex, n int) int {
    if n < 1 { // 阶乘退出条件 
        return 0
    }
    fmt.Println("RLock")
    m.RLock()
    defer func() {
        fmt.Println("RUnlock")
        m.RUnlock()
    }()
    time.Sleep(100 * time.Millisecond)
    return factorial(m, n-1) * n // 递归调用
}

factoria方法是一个递归计算阶乘的方法,我们用它来模拟reader。为了更容易地制造出死锁场景,我在这里加上了sleep的调用,延缓逻辑的执行。这个方法会调用读锁(第27行),在第33行递归地调用此方法,每次调用都会产生一次读锁的调用,所以可以不断地产生读锁的调用,而且必须等到新请求的读锁释放,这个读锁才能释放。

同时,我们使用另一个goroutine去调用Lock方法,来实现writer,这个writer会等待200毫秒后才会调用Lock,这样在调用Lock的时候,factoria方法还在执行中不断调用RLock。

这两个goroutine互相持有锁并等待,谁也不会退让一步,满足了“writer依赖活跃的reader -> 活跃的reader依赖新来的reader -> 新来的reader依赖writer”的死锁条件,所以就导致了死锁的产生。

所以,使用读写锁最需要注意的一点就是尽量避免重入,重入带来的死锁非常隐蔽,而且难以诊断。

坑点3:释放未加锁的RWMutex

和互斥锁一样,Lock和Unlock的调用总是成对出现的,RLock和RUnlock的调用也必须成对出现。Lock和RLock多余的调用会导致锁没有被释放,可能会出现死锁,而Unlock和RUnlock多余的调用会导致panic。在生产环境中出现panic是大忌,你总不希望半夜爬起来处理生产环境程序崩溃的问题吧?所以,在使用读写锁的时候,一定要注意,不遗漏不多余

流行的Go开发项目中的坑

好了,又到了泡一杯宁夏枸杞加新疆大滩枣的养生茶,静静地欣赏知名项目出现Bug的时候了,这次被拉出来的是RWMutex的Bug。

Docker

issue 36840

issue 36840修复的是错误地把writer当成reader的Bug。 这个地方本来需要修改数据,需要调用的是写锁,结果用的却是读锁。或许是被它紧挨着的findNode方法调用迷惑了,认为这只是一个读操作。可实际上,代码后面还会有changeNodeState方法的调用,这是一个写操作。修复办法也很简单,只需要改成Lock/Unlock即可。

Kubernetes

issue 62464

issue 62464就是读写锁第二种死锁的场景,这是一个典型的reader导致的死锁的例子。知道墨菲定律吧?“凡是可能出错的事,必定会出错”。你可能觉得我前面讲的RWMutex的坑绝对不会被人踩的,因为道理大家都懂,但是你看,Kubernetes就踩了这个重入的坑。

这个issue在移除pod的时候可能会发生,原因就在于,GetCPUSetOrDefault方法会请求读锁,同时,它还会调用GetCPUSet或GetDefaultCPUSet方法。当这两个方法都请求写锁时,是获取不到的,因为GetCPUSetOrDefault方法还没有执行完,不会释放读锁,这就形成了死锁。

总结

在开发过程中,一开始考虑共享资源并发访问问题的时候,我们就会想到互斥锁Mutex。因为刚开始的时候,我们还并不太了解并发的情况,所以,就会使用最简单的同步原语来解决问题。等到系统成熟,真正到了需要性能优化的时候,我们就能静下心来分析并发场景的可能性,这个时候,我们就要考虑将Mutex修改为RWMutex,来压榨系统的性能。

当然,如果一开始你的场景就非常明确了,比如我就要实现一个线程安全的map,那么,一开始你就可以考虑使用读写锁。

正如我在前面提到的,如果你能意识到你要解决的问题是一个readers-writers问题,那么你就可以毫不犹豫地选择RWMutex,不用考虑其它选择。那在使用RWMutex时,最需要注意的一点就是尽量避免重入,重入带来的死锁非常隐蔽,而且难以诊断。

另外我们也可以扩展RWMutex,不过实现方法和互斥锁Mutex差不多,在技术上是一样的,都是通过unsafe来实现,我就不再具体讲了。课下你可以参照我们上节课学习的方法,实现一个扩展的RWMutex。

这一讲我们系统学习了读写锁的相关知识,这里提供给你一个知识地图,帮助你复习本节课的知识。

思考题

请你写一个扩展的读写锁,比如提供TryLock,查询当前是否有writer、reader的数量等方法。

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。