Don't communicate by sharing memory, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。
数据结构
我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列,那么如何实现这种队列
channel 的底层数据结构是一个 *hchan
,在编译时期会将 make(chan int) 语句转换成 makechan 函数调用
hchan
// runtime/chan.go
type hchan struct {
lock mutex // lock 用来保护 hchan 上所有的字段
// 缓冲区实际是一个循环队列
buf unsafe.Pointer // 指向缓冲区的指针
dataqsiz uint // 缓冲区循环队列的大小
sendx uint // 缓冲区循环队列接收下一个元素的索引
recvx uint // 缓冲区循环队列中下一个会返回的元素的索引
qcount uint // 当前 hchan 缓存的元素数量
closed uint32 // hchan 是否关闭
elemsize uint16 // hchan 的元素大小
elemtype *_type // hchan 的元素类型
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
}
可以看出 channel 的底层数据结构
- 缓冲区
buf
底层是一个循环队列,dataqsiz
和qcount
分别记录了缓冲区的大小和当前缓冲的元素数量,sendx
,recvx
用来记录位置索引 -
elemsize
和elemtype
表示元素大小和类型 -
recvq
和sendq
来记录被发送接收阻塞的 goroutine 队列 -
closed
用来记录是否关闭 -
lock
用来保护hchan中的字段,更新其他字段的时候都需要加锁
对于无缓冲 channel 是不需要和缓冲区相关的字段的
channel 在实现中依然使用到了锁,Go 所说的 使用通信来实现共享内存,实际上依然在底层使用锁来保证读写的原子性,实现出了一个面向数据流式的数据结构
待发送者和待接收者
注意到 recvq
和 sendq
类型 waitq
是一个双向链表,提供了等待 goroutine 的出队入队
// runtime/chan.go
type waitq struct {
first *sudog
last * sudog
}
func(q *waitq) enqueue(sgp *sudog){
// ...
}
func (q *waitq) dequeue(sgp *sudog){
// ...
}
sudog
是对被阻塞的 goroutine 的封装,简单看一下 channel 会使用到的一些字段
// runtime/runtime2.go
type sudog struct {
g *g //阻塞的 goroutine
elem unsafe.Pointer
c *hchan // 阻塞的 channel
elem
字段是一个指针,在 channel 会被用来指向待发送者要发送的数据或者待接收者的接收位置
// 从 ch 接收数据被阻塞,那么 sudog.elem 会指向 x
x <- ch
// 向 ch 发送数据被阻塞,那么 sudog.elem 会指向 y
ch <- y
makechan 创建 channel
channel 分为无缓冲 channel 和 缓冲 channel,虽然两种 channel 的创建方式不同,但是都是调用 makechan
ch := make(chan int) // 无缓冲 channel
ch := make(chan int, 10)// 有缓冲 channel
makechan
函数会接受元素的类型和缓冲的大小,如果 size
为 0,就是无缓冲 channel 了
// src/runtime/chan.go
func makechan(t *chantype, size int) *hchan{
elem := t.elem
// 检查 elem size,align
// 计算出缓冲区的大小,如果是非缓冲 channel 或者元素为 struct{},那么 mem 就是 0
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0{
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch{
// 非缓冲 channel 或者 缓冲区元素 为 struct{}
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 如果是非缓冲,则buf并没有用
// 如果缓冲元素类型为 struct{}, 则只会用到 sendx 和 recvx, 并不会真正拷贝数据到缓冲区
c.buf = unsafe.Pointer(&c.buf)
// channel 中元素不包含指针
case elem.ptrdata == 0:
// 将 hchan 结构和缓冲区的内存一起分配
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// buf 指向 hchan 后边的地址
c.buf = add(unsafe.Pointer(c), hchanSize)
// 默认,分别分配 chan 和 buf 的内存
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 设置 hchan 的其他字段
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 底层循环队列长度
c.datasiz = uint(size)
return c
通过 makechan
函数,可以总结出 hchan 结构的特点
- 无缓冲或者缓冲的元素类型为 struct{} 时,并不会为缓冲区(hcha.buf)分配内存
- 缓冲的元素结构中不包含指针时,会将 hchan 和 缓冲区buf 是一块连续的内存
make 与 makechan
make
函数在编译阶段又是如何转换成 makechan
函数调用的呢
首先编译器会将 make
的调用转换成 OMAKE
类型的节点,然后判断 make
的对象类型,如果是 TCHAN
的话,将节点类型置为 OMAKECHAN
,并且检查 make
的第二个参数,也就是缓冲区大小
// src/cmd/compile/internal/gc/typecheck.go
func typecheck1(n *Node, top int) (res *Node) {
// ...
switch n.Op{
case OMAKE:
switch t.Etype {
case TCHAN:
l = nil
if i < len(args){
// ... 对缓冲区大小进行检测
n.Left = l // 带缓冲区,赋值缓冲区大小
}else{
n.Left = nodintconst(0) // 不带缓冲区
}
n.Op = OMAKECHAN
}
}
}
然后OMAKECHAN 节点会在 walkexpr 函数中转换成调用 makechan 或者 makechan64 函数
// src/cmd/compile/internal/gc/walk.go
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
发送数据
向 channel 发送数据的语句会在编译期间转换成 chansend 函数
ch := make(chan int)
ch <- 10
发送语句非常简单,但是真正的函数执行会区分很多的情况,做一些小的优化,可以称为特性
发送操作的特性
- 向 nil channel 发送数据会被永久阻塞,并且不会被 select 语句选中
- 如果 channel 未关闭,非缓冲并且没有待接收的 goroutine,或者缓冲区已满,那么不会被 select 语句选中
- 向关闭的 channel 发送数据,会 panic ,并且可以被 select 语句选中,意味着 select 语句中可能会 panic
- 如果有待接收者,那么会将发送的数据直接 copy 到待接收者的接收位置,然后唤醒接收者
- 如果有缓冲区,并且缓冲区未满,那么就把发送的数据 copy 到缓冲区中
- 如果 channel 未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前 goroutine, 等待被唤醒
- 发送者被阻塞后,可以被关闭 channel 操作或者被接收操作唤醒,关闭 channel 导致发送者被唤醒后,会panic
- 当 channel 中有待接收 goroutine,那么 channel 的状态必然是 非缓冲或者缓冲区为空
发送数据,可以被 select 选中的情况
- channel 已关闭
- channel 未关闭,channel有待接收的 goroutine,或者缓冲区不为空并且缓冲区未满
深入源码
ch <- i
发送语句实际会被转换为 chansend1
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
chansend1 会直接调用 chansend 来发送数据,并且 block
为 true,说明 ch <- i
语句可以被阻塞
// src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
c
表示操作的 channel
ep
是一个指针,指向发送的数据 ch <- i
block
表示是否是阻塞调用,在 select case 语句中才会设置为 false
callerpc
暂时不需要关心
返回值是个 bool
类型,表示是否发送成功,未发送成功的操作也不会被 select
语句选中
首先看一下 channel 为 nil 的情况,这时并不需要加锁
if c == nil{
if !block {
// block 为 false, 则直接返回 false, 表示发送失败
return false
}
// 对于 nil channel,直接挂起当前 goroutine,并永久阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
// 不会执行到这一步
throw("unreadable")
}
如果是非阻塞调用,也就是 select case 语句中调用,那么直接返回 false,意味着向 nil channel 发送数据不会被选中
阻塞调用就被 gopark 挂起,永久阻塞
在 channel 加锁之前,对于非阻塞并且未关闭的情况会有一步快速检测的判断,可以快速返回
// 快速检测,非阻塞时,有些情况不需要获取锁就可以直接返回
// 非阻塞,未关闭,非缓冲+没有等待接收的 goroutine 或者 缓冲+缓冲区已满
if !block && c.closed == 0 &&
((c.dataqsiz == 0 && c.recvq.first == nil) ||
((c.dataqsiz < 0 && c.qcount == c.dataqsiz)) {
// 返回 false,表示未发送成功
return false
}
缓冲区没有空间,并且待接收的 goroutine 时,可以直接返回未发送成功
加锁,判断 channel 是否关闭,如果已关闭,直接 panic
// 加锁
lock(&c.lock)
// 如果 channel 已关闭,则 panic
if c.closed != 0{
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
channel 待接收队列中有等待的 goroutine
lock(&c.lock)
// ...
// 从待接收队列中获取等待的 goroutine
if sg := c.recvq.dequeue(); seq != nil {
// 只要可以从待接收队列中获取到 goroutine,那么发送操作都是只需要 copy 一次
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
如果待接收队列中有等待的接收者的话,说明 channel 的缓冲区为空
调用 send 函数,无论是否是无缓冲 channel
,都直接复制给待接收者
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// sg.elem 是指向待接收 goroutine 中接收数据的指针 s <- ch
// 如果待接收 goroutine 需要接收具体的数据,那么直接将数据 copy 到 sg.elem
if sg.elem != nil{
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf() // unlock(&c.lock)
// 赋值 param,待接收者被唤醒后会根据 param 来判断是否是被发送者唤醒的
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1) // 唤醒待接收者
}
会判断一下接收者是否需要接收数据,也就是 sudog.elem
是否为 nil
如果不为 nil,就调用 sendDirect 把发送的数据(ep 指向的数据) 复制到 sudog.elem
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 是发送的数据源地址,dst 是接收数据的地址
// src 在当前的 goroutine 栈中,而 dst 在其他栈上
dst := sg.elem
// 使用 memove 直接进行内存 copy
// 因为 dst 指向其他 goroutine 的栈,如果它发生了栈收缩,那么就没有修改真正的 dst 位置
// 所以会加读写前加一个屏障
typebitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memove(dst, src, t.size)
sendDirect 在进行跨 goroutine 内存 copy 时,调用 typebitsBulkBarrier 来加上了写屏障
因为 GC 会假设对栈的写操作只会发生在 goroutine 正在运行时,并且是由当前 goroutine 写的,
而 sendDirect 跨 goroutine 的栈读写会违背这个假设,为了避免出现问题,需要加上写屏障
缓冲区未满,直接将数据发送到缓冲区中
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// 获取缓冲发送数据的指针
// add(c.buf, uintptr(i)*uintptr(c.elemsize))
qp := chanbuf(c, c.sendx)
// copy 数据,ep, gp 都是指针,分别指向数据源和数据目的地
typedmemove(c.elemtype, qp, ep)
// 递增存放发送数据的索引
c.sendx++
if c.sendx == c.dataqsiz{
// 缓冲区是一个循环数组,调整索引
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
chanbuf 函数通过 hchan.sendx
获取到缓冲区存放发送的数据的地址,然后调整循环数组的sendx
索引
channel 未关闭,对于非缓冲 channel,待接收队列为空,对于缓冲 channel,缓冲区已满
逻辑依次执行到这里:
lock(&c.lock)
// ...
// 如果非阻塞发送,那么可以直接解锁返回,未发送成功
if !block{
unlock(&c.lock)
return false
}
// 阻塞发送,那么就挂起当前 goroutine
gp := getg()
// 生成配置 sudo,省略部分赋值操作
mysg := acquireSudog()
mysg.elem = ep // 将指向发送数据的指针保存到 elem 中
mysg.g = gp
mysg.c = c // 当前阻塞的 channel
gp.wait = mysg
// param 可以用来传递数据,其他 goroutine 唤醒该 goroutine 时可以设置该字段,然后根据该字段做一些判断
pg.param = nil
// 入队待发送队列
c.sendq.enqueue(mysg)
// 挂起goroutine,等待唤醒
// chanparkcommit 函数会解锁 ch.lock
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
非阻塞的话,会直接返回为发送成功
阻塞调用,则会构建 sudog 对象,然后添加到待发送队列,解锁,挂起当前 goroutine
会被唤醒的情况有两种
- 关闭 channel
- 发生接收操作,接收者可能会唤醒该发送者
// 被唤醒,执行检查清理操作
// ...
// param 字段为 nil 表示是由于 close channel 导致的关闭,panic
// close channel 和接收操作都可能唤醒等待发送的 goroutine, 但是他们设置 param 不一样
if gp.param == nil {
if c.closed = 0 {
throw("chansend: suprious wakeup")
}
panic(plainError("send on closed channel"))
}
// 清理,释放 sudog
pg.param == nil
mysq.c = nil
releaseSudog(mysg)
// 发送成功
return true
}
被唤醒后会判断 g.param
是否为 nil,因为关闭 channel 时会将待发送 goroutine 的 param
字段置为 nil,会根据这个字段决定是否 panic
select & 发送操作
golang 会对 select 语句进行一些优化
单个发送 case
select {
case ch <- i:
// ...
}
// 会被优化为
if ch == nil {
block()
}
ch <- i
会在编译期间转换为阻塞发送语句
非阻塞操作,发送 + default
select {
case ch <- i:
// ...
default:
// ...
}
// =====>
if selectnbsend(ch, i) {
// ...
} else {
// ...
}
非阻塞操作实际调用 selectnbsend
,根据函数返回值决定是否执行 default
逻辑
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
// block 参数为 false,非阻塞调用
return chansend(c,elem, false, getcallerpc())
}
返回 false 表示未发送成功,select
便会执行 default
思考:为什么向关闭的 channel 发送数据需要 panic
接收数据
如何从 channel 中接收数据
// 接收单个值,如果 channel 被关闭后,会返回 channel 中元素的零值
i <- ch // 调用 `chanrecv1` 函数
// 如果 channel 被关闭并且缓冲区为空,那么 ok 的值就是 false
i, ok <- ch // 调用 `chanrecv2` 函数
i
是接收操作的接收值
,ok
表示是否从 channel 中接收到有效的数据,即使 channel 已经关闭,但是缓冲区中依然存在数据,那么 ok
也会是 true
接收操作的特性
- 从 nil channel 中接收数据会永久阻塞,而且不会被select 语句选中
- 如果 channel 未关闭,没有待发送者或者缓冲 channel 的缓冲区为空的话,不会被 select 语句选中
- 从已关闭并且缓冲区为空的 channel 中接收数据的话,会把
接收值
置为空值,而且可以被select
语句选中 - 如果待发送队列不为空,说明无缓冲或者缓冲已满,对于无缓冲直接从待发送者复制数据到
接收值
,如果缓冲区已满,那么先将缓冲区中数据复制给接收者,然后将待发送者的数据复制到缓冲区中并唤醒发送者 - 只要缓冲区不为空,即使channel已关闭,依然可以从缓冲区中获取到数据
- 如果缓冲为空并且没有待发送者,不会被 select 语句选中,如果是阻塞接收操作的话,会被阻塞直到 channel 被关闭或者被发送者唤醒
- 接收者被关闭操作唤醒,那么
接收值
会被置为空值
接收操作被 select 语句选中的情况
- channel 已关闭
- 缓冲区中有数据
- 待发送队列不为空
深入源码
单值的接收语句实际调用 chanrecv1
// src/runtime/chan.go
i <- ch
// ===>
func chanrecv1(c *hchan, elem unsafe.Pointer){
chanrecv(c, elem, true)
}
接收两个值实际调用 chanrecv2
i, ok <- ch
// ===>
func chanrecv2(c *hchan, elem unsafe.Pointer)(received bool) {
_, received = chanrecv(c, elem, true)
}
chanrecv1
和 chanrecv2
实际都是调用 chanrecv
,他们两个之间的区别就是是否返回接收到有效数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
c
表示接收操作的 channel
ep
是一个指针,指向接收值
,i <- ch
语句 ep
就是 接收值 i
的地址
block
是否是阻塞操作,chanrecv1
和 chanrecv2
函数中block
为 true,说明是阻塞操作
返回值 selected
表示是否可以被 select
语句选中
返回值 received
表示是否可以接收到有效数据
**channel 在加锁前会判断一下是否为 nil **
if c == nil {
// 非阻塞下会直接返回
if !block {
return
}
// 永久挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
阻塞接收会被永久阻塞,非阻塞的话就直接返回,而且不会被 select 选中
阻塞接收时,对于未关闭 channel 满足一些条件不需要加锁就可以直接返回
// 快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回
// 非阻塞并且未关闭,非缓冲+没有待发送者或者有缓冲+缓冲为空
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
- 非缓冲 channel 如果没有待发送者
- 缓冲 channel 但是缓冲区为空
加锁,首先判断 channel 是否已关闭,缓冲区中是否还有数据
lock(&c.lock)
// channel 处于关闭,并且缓冲区已空
if c.closed != 0 && c.qcount == 0{
unlock(&c.lock)
if ep != nil{
// 如果接收的值需要赋值到变量 x <- ch
// 将接收的值置为空值
typedmemclr(c.elemtype, ep)
}
// 可被 select 语句选中,但是未接收到有效数据
return true, false
}
channel 已经关闭,而且缓冲区没有数据,如果 ep
不为nil ,也就是说存在接收值
,那么就把接收值置为空值
ep 为空的情况是
<- chan
接收操作没有接收值
selected
返回 true,表示可以被 select
语句选中
待发送队列不为空,存在待发送者
lock(&c.lock)
// ...
// 待发送队列中有 goroutine,说明是非缓冲 channel 或者 缓冲已满的 channel
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func(){ unlock(&c.lock) }, 3)
return true, true // 可被选中,并且接收成功
}
如果待发送队列中有等待发送的 goroutine,说明 channel 是非缓冲channel,或者缓冲区已经满了
- 非缓冲channel,会将数据从待发送者复制给接收者
- 缓冲区已满的话,会先从缓冲区中接收数据,然后将待发送者的数据发送到缓冲区中
这里和发送操作时,channel 的待接收队列不为空的情况不一样,因为待接收队列不为空,说明缓冲区肯定是没有数据的,可以跳过缓冲区,直接将数据发送到等待接收的 goroutine
因为要区分 channel 的类型所以 recv
函数的逻辑就会有一点复杂
对于非缓冲 channel,如果有接收值
,直接调用 recvDirect
从待发送者复制值
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 无缓冲 channel
if c.dataqsiz == 0 {
// 如果ep 不为 nil,那么直接从发送 goroutine 中将数据 copy 到接收位置
if ep != nil{
recvDirect(c.elemtype, sg, ep)
}
}
对于缓冲区有数据的情况
- 先从缓冲区复制数据到
接收值
,也就是 ep 指向的地址 - 然后将待发送者要发送的数据复制到缓冲区中
- 调整缓冲区循环数据的接收索引
recvx
} else {
// 获取缓冲区中待接收的地址
gp := chanbuf(c, c.recvx)
if ep != nil {
// 将待接收数据复制到接收位置
typedmemmove(c.elemtype, ep, qp)
}
// 将待发送者发送的数据复制到相应缓冲区的位置
typedmemmove(c.elemtype, qp,sq.elem)
// 调整 recvx
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 由于缓冲区已满,sendx 和 recvx 必然相等
c.sendx = c.recvx
}
无论是缓冲还是非缓冲 channel,recv
函数最后都会唤醒发送者
// 赋值发送者的 param,发送者被唤醒后会根据 param 来判断是否是关闭唤醒的
sg.elem = nil
gp := sg,g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
接收操作会赋值发送者 goroutine 的 param
字段,发送者被唤醒后,会根据 param 参数来判断是有接收操作唤醒还是被关闭 channel 操作唤醒
缓冲区中有数据,无论 channel 被关闭,都会发送给接收者
lock(&c.lock)
// ...
// 如果缓冲区不为空,依然有未发送的数据
// 需要注意,这时 channel 可能已经处于关闭状态了,但是依然可以从关闭的缓冲区中接收到数据
if c.qcount > 0{
// 获取指向缓冲区中待接收数据的指针
gp ;= chanbuf(c, c.recvx)
if ep != nil{
// 如果接收操作有接收值,那么直接 copy 到 ep
typedmemmove(c.elemtype, ep, gp)
}
// 清理缓冲区中已接收到的数据内存
typedememclr(c.elemtype, gp)
// 调整待接收索引
c.recv++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
// 可以被选中,并且接收成功
return true, true
}
这一部分的逻辑就比较简单
- 获取缓冲区的待接收数据的地址
gp
,如果有接收者
,便将数据复制给接收者
- 调整缓冲区循环数据的待接收索引
recvx
channel 未关闭, 缓冲区没有元素,并且没有待接收者
非阻塞操作,可以直接解锁返回,并且不会被 select
语句选中
lock(&c.lock)
// ...
// 缓冲区没有元素并且没有待发送者
if !block {
unblock(&c.block)
// 不会被选中,并且没有接收到有效数据
return false, false
}
阻塞操作,挂起当前 goroutine,等待被发送操作或者关闭操作唤醒
lock(&c.lock)
// ...
gp = getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.param = nil
// 入队到待发送者队列中
c.recvq.enqueue(mysg)
// 挂起 goroutine,等待由关闭操作或者发送操作唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 被唤醒,做一些检测,和清理操作
// 根据 param 判断是否是由关闭唤醒的
// 有 closed 唤醒时,param 会被置为 nil
closed := gp.param == nil
pg.param = nil
mysg.c = nil
releaseSudog(mysg)
// 可以被选中,但是 closed 反应是否接受到有效数据
return true, !closed
}
被唤醒后会根据 param
字段,判断是否是由关闭操作唤醒
select 与 接收操作
单个接收 case
select {
case i <- ch:
}
// ====>
if ch == nil{
block()
}
i <- ch
非阻塞接收
select {
case v <- ch: // case v, received <- ch:
// ...
default:
// ...
}
// ===>
// if ch != nil && selectnbrecv2(&v, &ok, ch) {
if selectnbrecv(&v, ch) {
// ...
} else {
// ...
}
非阻塞接收会调用 selectnbrecv
和 selectnbrecv2
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
elected, *received = chanrecv(c, elem, false)
return
}
关闭 channel
关闭 channel 直接调用 close
函数即可,但是贸然关闭 channel 会引发很多的问题
ch := make(chan int)
// 关闭 goroutine
close(ch)
关闭操作的特性
- 关闭 nil channel 会 panic
- 关闭已关闭的 channel 会 panic
- 关闭操作会将待接收者的接收值置为空值,唤醒所有待发送者和待接收者
关于如何优雅的关闭 channel,可以看一下 go101 中 如何优雅地关闭通道
深入源码
关闭 nil channel 会panic
func closechan(c *hchan) {
// 关闭 nil channel 会 panic
if c == nil{
panic(plainError("close of nil channel"))
}
重复关闭 channel,也会 panic
// 加锁
lock(&c.lock)
if c.closed != 0 {
// 重复关闭会 panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
需要注意关闭操作中,判断 channel 是否关闭前会加锁
处理待接收者,如果有接收者
,那么就置为空值
c.closed = 1
var glist gList
// 处理待接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
// 将待接收位置置为空值
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil // 清理 elem 指针
}
gp := sg.g
// param 置为 nil,接收者被唤醒后会返回未接收到有效数据
gp.param = nil
glist.push(gp)
}
处理待发送者
// 处理待发送的队列
for {
sg := c.sendq.dequeue()
if sg == nil {
// 没有待发送的goroutine了
break
}
sg.elem = nil
gp := sg.g
// 将 param 置为 nil, 待发送者被唤醒后,会 panic
gp.param = nil
glist.push(gp)
}
解锁,唤醒所有待发送者和待接收者
unlock(&c.lock)
// 唤醒所有阻塞的 goroutine
for !glist.empty(){
gp := glist.pop()
gpready(gp, 3)
}
}
关闭操作唤醒 channel 中阻塞的 goroutine
在处理待发送者和待接收者时,都会将 goroutine 的 param
字段置为 nil,然后当被唤醒后待发送者和待接收者就能区分如何被唤醒的
发送操作
// runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// 阻塞,挂起 goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
if gp.param == nil {
if c.closed = 0 {
throw("chansend: suprious wakeup")
}
panic(plainError("send on closed channel"))
}
// ...
可以看到发送操作被唤醒后会判断 param
字段
如果是由于 channel 关闭导致被唤醒,那么直接 panic
- 关闭操作唤醒,goroutine param 字段为 nil
func closechan(c *hchan) {
// ...
for {
sg := c.recvq.dequeue()
// ...
pg := sg.pg
gp.param = nil
// ...
}
// ... 唤醒 goroutine
}
- 接收操作唤醒,goroutine param 不为 nil
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ... 数据复制
pg := sg.g
pg.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
接收操作
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// 阻塞,挂起当前 goroutine
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 被唤醒
// ...
closed := gp.parma == nil
// ...
return true, !closed
接收操作在关闭后并不会 panic,而是会作为 received 返回,表示是否接收到有效的数据
参考资料
深度解密Go语言之channel
Go 语言设计与实现 —— Channel
推荐阅读
Go101 通道
如何优雅地关闭通道
浅谈 Go 语言 select 的实现原理
图解Go的channel底层原理
走进Golang之Channel的使用
走进Golang之Channel的数据结构