Waitgroup 实现

数据结构

type WaitGroup struct {
	noCopy noCopy
	// 64 位的原子操作需要有64位的内存对齐,而32位的编译器无法保证对齐,因此如果是32位的话前8字节做state,后4字节做信号量
	state1 [3]uint32
}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// 有内存对齐 -> 64位
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		// 无内存对齐 -> 32位
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}
|counter + waiter 64| 内存对齐 32 空(8字节)| sema 32  -> 64 

|sema 32| counter 32 + waiter 32  -> 32 

CPU 按照块来读取内存

API

从源码可以看到,add 需要在创建 goroutine 前被调用,因为其需要预先知道产生的 goroutine 个数,done 其实也只是调用 add(-1)

// counter + delta的值
func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()

	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)	// 取出 counter
	w := uint32(state) // 取出 waiter

	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {
		return
	}
	// V等于0,所有等待者
	*statep = 0
	// 信号量自增,一次唤醒一个 waiter
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

Wait 通过运行时阻塞自己直到 counter 计数器的值为0,如果在 wait 前 v 已经为0,那么就不需要阻塞自己,直接返回

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
		if v == 0 {
			// 不需要等待
			return
		}
		// 需要等待,waiter自增使用CAS操作,可能会失败(在这个时间段里V变为0),因此需要死循环需要重试
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			// 传入 *uint32 的指针作为计数器,信号量初始值为0,因此这个 goroutine 会陷入等待,内部也是一个死循环,会获得 sudog 并让这个 goroutine 休眠
			runtime_Semacquire(semap)
			return
		}
	}
}

使用场景

常用于等待多个 G 执行结束