Waitgroup 实现
数据结构
type WaitGroup struct {
noCopy noCopy
// 64 位的原子操作需要有64位的内存对齐,而32位的编译器无法保证对齐,因此如果是32位的话前8字节做state,后4字节做信号量
state1 [3]uint32
}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 有内存对齐 -> 64位
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 无内存对齐 -> 32位
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
|counter + waiter 64| 内存对齐 32 空(8字节)| sema 32 -> 64 位
|sema 32| counter 32 + waiter 32 -> 32 位
CPU 按照块来读取内存
API
从源码可以看到,add 需要在创建 goroutine 前被调用,因为其需要预先知道产生的 goroutine 个数,done 其实也只是调用 add(-1)
// counter + delta的值
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 取出 counter
w := uint32(state) // 取出 waiter
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// V等于0,所有等待者
*statep = 0
// 信号量自增,一次唤醒一个 waiter
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Wait 通过运行时阻塞自己直到 counter 计数器的值为0,如果在 wait 前 v 已经为0,那么就不需要阻塞自己,直接返回
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// 不需要等待
return
}
// 需要等待,waiter自增使用CAS操作,可能会失败(在这个时间段里V变为0),因此需要死循环需要重试
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 传入 *uint32 的指针作为计数器,信号量初始值为0,因此这个 goroutine 会陷入等待,内部也是一个死循环,会获得 sudog 并让这个 goroutine 休眠
runtime_Semacquire(semap)
return
}
}
}
使用场景
常用于等待多个 G 执行结束