Go语言并发模型
Go 语言中使用了CSP模型来进行线程通信,准确说,是轻量级线程goroutine之间的通信。CSP模型和Actor模型类似,也是由独立的,并发执行的实体所构成,实体之间也是通过发送消息进行通信的。
Actor模型和CSP模型区别
Actor之间直接通讯,而CSP是通过Channel通讯,在耦合度上两者是有区别的,后者更加松耦合。
主要的区别在于:CSP模型中消息的发送者和接收者之间通过Channel松耦合,发送者不知道自己消息被哪个接收者消费了,接收者也不知道是哪个发送者发送的消息。在Actor模型中,由于Actor可以根据自己的状态选择处理哪个传入消息,自主性可控性更好些。
在Go语言中为了不堵塞进程,程序员必须检查不同的传入消息,以便预见确保正确的顺序。CSP好处是Channel不需要缓冲消息,而Actor理论上需要一个无限大小的邮箱作为消息缓冲。
CSP模型的消息发送方只能在接收方准备好接收消息时才能发送消息。相反,Actor模型中的消息传递是异步的,即消息的发送和接收无需在同一时间进行,发送方可以在接收方准备好接收消息前将消息发送出去。
Channel定义
Go 语言中Channel类型的定义格式如下:
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType
ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})
它包括三种类型的定义。可选的<-代表channel的方向。如果没有指定方向,那么Channel就是双向的,既可以接收数据,也可以发送数据。
chan T // 可以接收和发送类型为 T 的数据
chan <- float64 // 发送:只可以用来发送 float64 类型的数据
<-chan int // 接收:只可以用来接收 int 类型的数据
注意:<- 总是优先和最左边的类型结合(The <- operator associates with the leftmost chan possible)。
默认情况下,channel发送方和接收方会一直阻塞直到对方准备好发送或者接收,这就使得Go语言无需加锁或者其他条件,天然支持了并发。
c := make(chan bool) //创建一个无缓冲的bool型Channel
c <- x //向一个Channel发送一个值x
<- c //从一个Channel中接收一个值
x = <- c //从Channel c接收一个值并将其存储到x中
x, ok = <- c //从Channel接收一个值,如果channel关闭了或没有数据,那么ok将被置为false
使用 make 初始化Channel,还可以设置容量:
make(chan int, 100) #//创建一个有缓冲的int型Channel
func sum(a []int, c chan int) {
total := 0
for _, v := range a {
total += v
}
c <- total // send total to c
}
func main() {
a := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(a[:len(a)/2], c) // {7,2,8}
go sum(a[len(a)/2:], c) // {-9,4,0}
go sum(a, c) // {7,2,8,-9,5,0}
x, y, z := <-c, <-c, <-c // receive from c
fmt.Println(x, y, z, x+y)
}
打印输出:12 -5 17 7,也可能:12 17 -5 7
Channel缓冲
容量(capacity)代表Channel容纳的最多的元素的数量,代表Channel的缓冲的大小,就是channel可以存储多少元素。 如果没有设置容量,或者容量设置为0, 说明Channel没有缓冲。
ch := make(chan bool, 4)
以上,创建了可以存储4个元素的bool型channel。在这个channel中,前4个元素可以无阻塞的写入,当写入第5个元素时,代码将会阻塞,直到其他goroutine从channel 中读取一些元素,腾出空间。
当 value = 0 时,channel 是无缓冲阻塞读写的,当value > 0 时,channel 有缓冲、是非阻塞的,直到写满 value 个元素才阻塞写入。
在实际开发中,你可以在多个goroutine从/往一个channel 中 receive/send 数据, 不必考虑额外的同步措施。
Channel可以作为一个先入先出(FIFO)的队列,接收的数据和发送的数据的顺序是一致的。
不带缓冲的Channel兼具通信和同步两种特性,在并发开发中颇受青睐。例如:
// _Channels_ are the pipes that connect concurrent
// goroutines. You can send values into channels from one
// goroutine and receive those values into another
// goroutine.
package main
import "fmt"
func main() {
// Create a new channel with `make(chan val-type)`.
// Channels are typed by the values they convey.
messages := make(chan string)
// _Send_ a value into a channel using the `channel <-`
// syntax. Here we send `"ping"` to the `messages`
// channel we made above, from a new goroutine.
go func() { messages <- "ping" }()
// The `<-channel` syntax _receives_ a value from the
// channel. Here we'll receive the `"ping"` message
// we sent above and print it out.
msg := <-messages
fmt.Println(msg)
}
上面示例的程序中,我们创建了一个不带缓冲的string类型Channel,然后在一个goroutine把“ping”用channel<-传递给了这个Channel。<-channel收到了这个值,然后在main函数打印出来。
其实在这里我们利用Channel悄然把“ping”message从一个goroutine转移到了main goroutine,实现了线程间(准确的说,goroutine之间的)通信。 因为channel发送方和接收方会一直阻塞直到对方准备好发送或者接收,这就使得我们在程序末尾等”ping” message而无需其他的同步操作。
Range和Close
可以通过range,像操作slice或者map一样操作缓存类型的channel。
func fibonacci(n int, c chan int) {
x, y := 1, 2
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c) // 显式的关闭channel
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
// 能够不断的读取channel里面的数据,直到该channel被显式的关闭
fmt.Println(i)
}
}
生产者通过内置函数close关闭channel,关闭channel之后就无法再发送任何数据了。
v, ok := <-ch
消费方可以通过以上语句测试channel是否被关闭。如果ok返回false,那么说明channel已经没有任何数据并且已经被关闭。
应该在生产者的地方关闭channel,而不是消费的地方去关闭它,这样容易引起panic。
channel不像文件之类的,不需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的。
Select
如果存在多个 channel 的时候,通过 select 可以监听 channel 上的数据流动。
select 默认是阻塞的,只有当监听的 channel 中有发送或接收可以进行时才会运行,当多个 channel 都准备好的时候,select是随机的选择一个执行的。
func fibonacci(c, quit chan int) {
fmt.Println("B")
x, y := 1, 2
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
close(c)
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
fmt.Println("A")
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
打印输出:
B
A
1
2
3
5
8
13
21
34
55
89
quit
超时
当出现goroutine阻塞的情况时,我们可以利用select来设置超时,避免整个程序进入阻塞的情况。
func main() {
c := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-c:
fmt.Println(v)
case <-time.After(5 * time.Second):
fmt.Println("timeout")
quit <- true
break
}
}
}()
<- quit
}
runtime goroutine
runtime包中有几个处理goroutine的函数:
Goexit
退出当前执行的goroutine,但是defer函数还会继续调用。
Gosched
让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行。
NumCPU
返回 CPU 核数量。
NumGoroutine
返回正在执行和排队的任务总数。
GOMAXPROCS
用来设置可以并行计算的CPU核数的最大值,并返回之前的值。