理解并发和并行
并发:同时管理多件事情。
并行:同时做多件事情。表示同时发生了多件事情,通过时间片切换,哪怕只有单一的核心,也可以实现“同时做多件事情”这个效果。
预热——用户级线程和内核级线程
线程被分为两类:用户级线程(User Level Thread)和内核级线程(Kernal Level Thread)
用户级线程
· 用户级线程只存在于用户空间,有关它的创建、调度和管理工作都由用户级线程库来支持。用户级线程库是用于用户级线程管理的例程包,支持线程的创建、终止,以及调度线程的执行并保存和恢复线程的上下文,这些操作都在用户空间运行,无需内核的支持。
· 由于内核无法感知用户级线程的存在,因此内核是以进程为单位进行调度的。当内核调度一个进程运行时,用户级线程库调度该进程的一个线程运行,如果时间片允许,该进程的其他线程也可能被运行。即该进程的多个线程共享该进程的运行时间片。
· 若该进程的一个线程进行IO操作,则该线程调用系统调用进入内核,启动IO设备后,内核会把该进程阻塞,并把CPU交给其他进程。即使被阻塞的进程的其他线程可以执行,内核也不会发现这一情况。在该进程的状态变为就绪前,内核不会调度它运行。属于该进程的线程都不可能运行,因而用户级线程的并行性会受到一定的限制。
内核级线程
· 内核级线程的所有创建、调度及管理操作都由操作系统内核完成,内核保存线程的状态及上下文信息。
· 当一个线程引起阻塞的系统调用时,内核可以调度进程的其他线程执行,多处理器系统上,内核分派属于同一进程的多个线程在多个处理器上执行,提升进程的并行度。
· 内核管理线程效率比用户态管理线程慢的多。
操作系统的三种线程模型
1.多对一模型
允许将多个用户级线程映射到一个内核线程。线程管理是在用户空间进行的,效率比较高。如果有一个线程执行了阻塞系统调用,那么整个进程就会阻塞。所以任意时刻只允许一个线程访问内核,这样多个线程不能并行运行在多处理器上。虽然多对一模型对创建用户级线程的数目并没有限制,但这些线程在同一时刻只能有一个被执行。
2.一对一模型
每个用户线程映射到一个内核线程。当一个线程执行阻塞系统调用,该模型允许另一个线程继续执行。这样提供了更好的并发功能。该模型也允许多个线程运行在多核处理器上。一对一模型可以获得高并发性,但因耗费资源而使线程数会受到限制。
3.多对多模型
在多对一模型和一对一模型中取了个折中,克服了多对一模型的并发度不高的缺点,又克服了一对一模型的一个用户进程占用太多内核级线程,开销太大的缺点。又拥有多对一模型和一对一模型各自的优点,可谓集两者之所长。
GO并发调度模型——G-P-M模型
GO可以使用如下方式创建一个"线程"(GO语言中所谓的goroutine)。
go func(paramName paramType, ...){
//函数体
}(param, ...)
等价于Java代码
new java.lang.Thread(() -> {
// do something in one new thread
}).start();
G-P-M模型图解
其图中的G, P和M都是Go语言运行时系统(其中包括内存分配器,并发调度器,垃圾收集器等组件,可以想象为Java中的JVM)抽象出来概念和数据结构对象。
G:G就是goroutine,通过go关键字创建,封装了所要执行的代码逻辑,可以称为是用户线程。属于用户级资源,对OS透明,具备轻量级,可以大量创建,上下文切换成本低等特点。
P:Processor即逻辑处理器,默认GO运行时的Processor数量等于CPU数量,也可以通过GOMAXPROCS函数指定P的数量。P的主要作用是管理G运行,每个P拥有一个本地队列,并为G在M上的运行提供本地化资源。
M:是操作系统创建的系统线程,作用就是执行G中包装的并发任务,被称为物理处理器。其属于OS资源,可创建的数量上也受限了OS,通常情况下G的数量都多于活跃的M的。Go运行时调度器将G公平合理的安排到多个M上去执行。
·G和M的关系:G是要执行的逻辑,M具体执行G的逻辑。Java中Thread实际上就是对M的封装,通过指定run()函数指定要执行的逻辑。GO语言中讲二者分开,通过P建立G和M的联系从而执行。
·G和P的关系:P是G的管理者,P将G交由M执行,并管理一定系统资源供G使用。一个P管理存储在其本地队列的所有G。P和G是1:n的关系。
·P和M的关系:P和M是1:1的关系。P将管理的G交由M具体执行,当遇到阻塞时,P可以与M解绑,并找到空闲的M进行绑定继续执行队列中其他可执行的G。
问题:为什么要有P?
G是对需要执行的代码逻辑的封装,M具体执行G,P存在的意义是什么?
Go语言运行时系统早期(Go1.0)的实现中并没有P的概念,Go中的调度器直接将G分配到合适的M上运行。但这样带来了很多问题,例如,不同的G在不同的M上并发运行时可能都需向系统申请资源(如堆内存),由于资源是全局的,将会由于资源竞争造成很多系统性能损耗。
Go 1.1起运行时系统加入了P,让P去管理G对象,M要想运行G必须先与一个P绑定,然后才能运行该P管理的G。P对象中预先申请一些系统资源作为本地资源,G需要的时候先向自己的P申请(无需锁保护),如果不够用或没有再向全局申请,而且从全局拿的时候会多拿一部分,以供后面高效的使用。
P的存在解耦了G和M,当M执行的G被阻塞时,P可以绑定到其他M上继续执行其管理的G,提升并发性能。
GO调度过程
①创建一个goroutine,调度器会将其放入全局队列。
②调度器为每个goroutine分配一个逻辑处理器。并放到逻辑处理器的本地队列中。
③本地队列中的goroutine会一直等待直到被逻辑处理器运行。
func task1() {
go task2()
go task3()
}
假设现在task1在称为G1的goroutine中运行,并在运行过程中创建两个新的goroutine,新创建的两个goroutine将会被放到全局队列中,调度器会再将他们分配给合适的P。
问题:如果遇到阻塞的情况怎么处理?
假设正在运行的goroutine要执行一个阻塞的系统调用,如打开一个文件,在这种情况下,这个M将会被内核调度器调度出CPU并处于阻塞状态。相应的与M相关联的P的本地队列中的其他G将无法被运行,但Go运行时系统的一个监控线程(sysmon线程)能探测到这样的M,将M与P解绑,M将继续被阻塞直到系统调用返回。P则会寻找新的M(没有则创建一个)与之绑定并执行剩余的G。
当之前被阻塞的M得到返回后,相应的G将会被放回本地队列,M则会保存好,等待再次使用。
问题:GO有时间片概念吗?
和操作系统按时间片调度线程不同,Go并没有时间片的概念。如果一个G没有发生阻塞的情况(如系统调用或阻塞在channel上),M是如何让G停下来并调度下一个G的呢?
G是被抢占调度的。GO语言运行时,会启动一个名为sysmon的M,该M无需绑定P。sysmon每20us~10ms启动一次,按照《Go语言学习笔记》中的总结,sysmon主要完成如下工作:
1.回收闲置超过5分钟的span物理内存
2.如果超过2分钟没有垃圾回收,强制执行
3.向长时间运行的G任务发出抢占调度
4.收回因syscall长时间阻塞的P
5.将长时间未处理的netpoll结果添加到任务队列
注:go的内存分配也是基于两种粒度的内存单位:span和object。span是连续的page,按page的数量进行归类,比如分为2个page的span,4个page的span等。object是span中按预设大小划分的块,也是按大小分类。同一个span中,只有一种类型大小的object。
sysmom的大致工作思路:
//$GOROOT/src/runtime/proc.go
// main方法
func main() {
... ...
systemstack(func() {
newm(sysmon, nil)
})
.... ...
}
func sysmon() {
// 回收闲置超过5分钟的span物理内存
scavengelimit := int64(5 * 60 * 1e9)
... ...
if .... {
... ...
// 如果P因syscall阻塞
//通过retake方法抢占G
if retake(now) != 0 {
idle = 0
} else {
idle++
}
... ...
}
}
抢占方法retake
// forcePreemptNS是指定的时间片,超过这一时间会尝试抢占
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
... ...
// 实施抢占
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
preemptone(_p_)
... ...
}
可以看出,如果一个G任务运行10ms,sysmon就会认为其运行时间太久而发出抢占式调度的请求。
goroutine
1.gouroutine切换
package main
import (
"runtime"
"sync"
"fmt"
)
func main() {
//指定调度器所能调度的逻辑处理器数量
runtime.GOMAXPROCS(1)
//使用wg等待程序完成
var wg sync.WaitGroup
//计数器+2,等待两个goroutine
wg.Add(2)
fmt.Println("Start GoRoutine")
//声明一个匿名函数,使用go关键字创建goroutine
go func() {
//函数退出时通过Done通知main函数工作已经完成
defer wg.Done()
for i := 1; i <= 3; i++ {
for char := 'a'; char < 'a' + 26; char++ {
fmt.Printf("%c ", char)
}
}
}()
go func() {
defer wg.Done()
for i := 1; i <= 3; i++ {
for char := 'A'; char < 'A' + 26; char++ {
fmt.Printf("%c ", char)
}
}
}()
//等待goroutine结束
fmt.Println("Waiting for finish")
wg.Wait()
fmt.Println("Program end")
}
输出:
Start GoRoutine
Waiting for finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
Program end
这个程序给人的感觉是串行的,原因是当调度器还没有准备切换打印小写字母的goroutine时,打印大写字母的goroutine就执行完了。
修改下,打印6000以内的素数。
package main
import (
"sync"
"runtime"
"fmt"
)
var wg sync.WaitGroup
func main(){
runtime.GOMAXPROCS(1)
wg.Add(2)
go printPrime("A")
go printPrime("B")
fmt.Println("Wait for finish")
wg.Wait()
fmt.Println("Program End")
}
func printPrime(prefix string){
defer wg.Done()
nextNum:
for i := 2; i < 6000; i++ {
for j := 2; j < i; j++ {
if i % j == 0 {
continue nextNum
}
}
fmt.Printf("%s:%d\n", prefix, i)
}
fmt.Printf("complete %s\n", prefix)
}
输出结果:
Wait for finish
B:2
B:3
B:5
B:7
B:11
...
B:457
B:461
B:463
B:467
A:2
A:3
A:5
A:7
...
A:5981
A:5987
complete A
B:5939
B:5953
B:5981
B:5987
complete B
Program End
在打印素数的过程中,goroutine的输出是混在一起的,由于runtime .GOMAXPROCS(1),可以看出两个G是在一个P上并发执行的。
package main
import (
"runtime"
"sync"
"fmt"
)
func main() {
runtime.GOMAXPROCS(2)
var wgp sync.WaitGroup
wgp.Add(2)
fmt.Println("Start Goroutines")
//第一个goroutine打印3遍小写字母
go func() {
defer wgp.Done()
for i := 0; i < 3; i++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
//第二个goroutine打印三遍大写字母
go func() {
defer wgp.Done()
for i := 0; i < 3; i++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
fmt.Println("Waiting for finish")
wgp.Wait()
fmt.Println("\nAll finish")
}
输出:
Start Goroutines
Waiting for finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d M N O P Q R S T U V e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m W X Y Z n o p q r s t u v w x y z
All finish
设置两个P,短时间内会有类似上面打印0-6000所有素数的效果,证明两个goroutine是并行执行的,但只有在多个P且每个可以同时让每个G运行再一个可用的M上时,G才会达到并行的效果。
竞争状态
多个goroutine再没有同步的情况下,同时读写某个共享资源就会产生竞争状态。对一个资源的读写必须是原子化的,即同一时刻只能有一个goroutine进行读写操作。
包含竞争状态的实例:
package main
import (
"sync"
"runtime"
"fmt"
)
var (
counter int
wgs sync.WaitGroup
)
func main(){
wgs.Add(2)
go incrCounter()
go incrCounter()
fmt.Println("adding")
wgs.Wait()
fmt.Printf("now counter is %d\n", counter)
}
//非原子操作加法
func incrCounter() {
defer wgs.Done()
for i := 1; i <= 2000; i++ {
val := counter
//让出处理器
runtime.Gosched()
val++
counter = val
}
}
输出:
adding
now counter is 2000
每次读取完count后存入副本,手动让出M,再次轮到G执行时会将之前的副本的值进行增1的操作,覆盖了另一个goroutine的操作。
goroutine同步的几种方式
①原子函数
原子函数以操作系统底层的枷锁机制同步访问变量,使用原子锁方式修改之前的代码:
package main
import (
"sync"
"fmt"
"sync/atomic"
"runtime"
)
var (
counter2 int64
wg2 sync.WaitGroup
)
func main() {
wg2.Add(2)
go incrCounterAtomic()
go incrCounterAtomic()
fmt.Println("adding...")
wg2.Wait()
fmt.Printf("now counter is : %d\n", counter2)
fmt.Println("Program end")
}
func incrCounterAtomic() {
defer wg2.Done()
for i := 1; i <= 2000; i++ {
atomic.AddInt64(&counter2, 1)
runtime.Gosched()
}
}
输出:
adding...
now counter is : 4000
Program end
互斥锁
互斥锁用于在代码当中建立一个临界区,保证同一时间只有一个G执行临界区的代码。
package main
import (
"sync"
"fmt"
"runtime"
)
var (
counter3 int
wg3 sync.WaitGroup
mutex sync.Mutex
)
func main() {
wg3.Add(2)
go incrCounterMutex()
go incrCounterMutex()
fmt.Println("Adding...")
wg3.Wait()
fmt.Printf("now counter is : %d\n", counter3)
}
func incrCounterMutex() {
defer wg3.Done()
for i := 1; i <= 2000; i++ {
//建立临界区
mutex.Lock()
{
val := counter3
runtime.Gosched()
val++
counter3 = val
}
mutex.Unlock()
}
}
通道
资源再goroutine之间共享时,可以使用通道实现同步。声明通道时,需要指定将要被共享的数据类型,可以通过通道共享内置类型、命名类型、结构类型、引用类型的值或指针。GO语言使用make函数创建通道。
①无缓冲通道
无缓冲通道使用make(chan type)声明,通道中存取消息都是阻塞的。
样例:
func main() {
var messages chan string = make(chan string)
go func(message string) {
messages <- message // 存消息
}("hello!")
fmt.Println(<-messages) // 取消息
}
阻塞即无缓冲的通道道在取消息和存消息的时候都会挂起当前的goroutine,除非另一端已经准备好。例:
package main
import (
"fmt"
"strconv"
)
var ch chan int = make(chan int)
func main() {
go input(0)
<- ch
fmt.Println("main finish")
}
func input(i int) {
for i := 0; i < 10; i++ {
fmt.Print(strconv.Itoa(i) + " ")
}
ch <- i
}
尝试注释掉ch的输入和输出,对比运行结果
不注释:
0 1 2 3 4 5 6 7 8 9 main finish
注释:
main finish
如果不用通道来阻塞主线的话,主线就会过早跑完,loop线将没有机会执行。
无缓冲的通道永远不会存储数据,只负责数据的流通。
如果从无缓冲通道读数据,必须要有数据写入通道,否则读取操作的goroutine将一直阻塞。
如果向无缓冲通道写数据,必须要有其他goroutine读取,否则该goroutine将一直被阻塞。
如果无缓冲通道中有数据,再向其写入,或者从无流入的通道数据中读取,则会形成死锁。
死锁
为什么会死锁?非缓冲信道上如果发生了流入无流出,或者流出无流入,也就导致了死锁。或者这样理解 Go启动的所有goroutine里的非缓冲信道一定要一个线里存数据,一个线里取数据,要成对才行 。
c, quit := make(chan int), make(chan int)
go func() {
c <- 1 // c通道的数据没有被其他goroutine读取走,堵塞当前goroutine
quit <- 0 // quit始终没有办法写入数据
}()
<- quit // quit 等待数据的写
是否所有不成对向信道存取数据的情况都是死锁?反例:
package main
var ch chan int = make(chan int)
func main() {
go input(0)
}
func input(i int) {
ch <- i
}
通道ch中只有流入没有流出,但运行不会报错。原因是main没等待其它goroutine,自己先跑完了, 所以没有数据流入c信道,一共执行了一个goroutine, 并且没有发生阻塞,所以没有死锁错误。
②缓冲通道
缓冲信道不仅可以流通数据,还可以缓存数据。它是有容量的,存入一个数据的话 , 可以先放在信道里,不必阻塞当前线而等待该数据取走。但是当缓冲信道达到满的状态的时候,就会表现出阻塞了。
package main
var ch chan int = make(chan int, 3)
func main() {
ch <- 1
ch <- 1
ch <- 1
//ch <- 1
}
如果是非缓冲通道,这段代码会报死锁。但是当有缓冲的通道时,代码正常执行。如果再加入一行数据流入,才会报死锁。
通道可以看做是一个先进先出的队列。
package main
import "fmt"
var ch chan int = make(chan int, 3)
func main() {
ch <- 1
ch <- 2
ch <- 3
fmt.Println(<- ch)
fmt.Println(<- ch)
fmt.Println(<- ch)
}
会按照数据写入的顺序依次读取
1
2
3
通道的其他基本操作
除了<-外,range也可以进行读取。
package main
import (
"fmt"
)
var ch chan int = make(chan int, 3)
func main() {
ch <- 1
ch <- 2
ch <- 3
for v := range ch {
fmt.Println(v)
}
}
输出
1
2
3
fatal error: all goroutines are asleep - deadlock!
可以读取但产生死锁,原因是range不等到通道关闭不结束读取,在没有数据流入的情况下,产生死锁。有两种方式可以避免这一情况。
1.通道长度为0即结束
package main
import (
"fmt"
)
var ch chan int = make(chan int, 3)
func main() {
ch <- 1
ch <- 2
ch <- 3
for v := range ch {
fmt.Println(v)
if len(ch) == 0 {
break
}
}
}
2.手动关闭通道
关闭通道只是关闭了向通道写入数据,但可以从通道读取。
package main
import (
"fmt"
)
var ch chan int = make(chan int, 3)
func main() {
ch <- 1
ch <- 2
ch <- 3
close(ch)
for v := range ch {
fmt.Println(v)
}
}
有缓冲通道图解: