为什么用 Go
- 语法先进。在语言层面支持线程(goroutine)和管道(channel)。对线程间的加锁、同步支持良好。
- 类型安全(type safe)。内存访问安全(memory safe),很难写出像 C++ 一样内存越界访问等 bug。
- 支持垃圾回收(GC)。不需要用户手动管理内存,这一点在多线程编程中尤为重要,因为在多线程中你很容易引用某块内存,然后忘记了在哪引用过。
- 简洁直观。没 C++ 那么多复杂的语言特性,并且在报错上很友好。
线程(Threads)
线程为什么这么重要?因为他是我们控制并发的主要手段,而并发是构成分布式系统的基础。在 Go 中,你可以将 goroutine 认为是线程,以下这两者混用。 每个线程可以有自己的内存栈、寄存器,但是他们可以共享一个地址空间。
使用原因
IO concurrency(IO 并发):一个历史说法,以前单核时,IO 是主要瓶颈,为了充分利用 CPU,一个线程在进行 IO 时,可以让出 CPU,让另一个线程进行计算、读取或发送网络消息等。在这里可以理解为:你可以通过多个线程并行的发送多个网络请求(比如 RPC、HTTP 等),然后分别等待其回复。
Parallelism(并行):充分利用多核 CPU。
关于并发(concurrency)和并行(parallelism)的区别和联系,可以看这篇文章。记住两个关键词即可:逻辑并发设计 vs 物理并行执行。
Convenience(方便):比如可以在后台启动一个线程,定时执行某件事、周期性的检测什么东西(比如心跳)。
Q&A:
- 不使用线程还能如何处理并发?基于事件驱动的异步编程。但是多线程模型更容易理解一些,毕竟每个线程内执行顺序和你的代码顺序是大体一致的。
- 进程和线程的区别?进程是操作系统提供的一种包含有独立地址空间的一种抽象,一个 Go 程序启动时作为一个进程,可以启动很多线程(不过我记得 Goroutine 是用户态的执行流)。
使用难点(challenges)
共享内存易出错。一个经典的问题是,多个线程并行执行语句:n = n + 1 时,由于该操作不是原子操作,在不加锁时,很容易出现 n 为非期望值。
我们称这种情况为竞态 (race):即两个以上的线程同时试图改变某个共享变量。
解决的方法是加锁,但如何科学的加锁以兼顾性能并避免死锁又是一门学问。
Q&A:
- Go 是否知道锁和资源(一些共享的变量)间的映射?Go 并不知道,它仅仅就是等待锁、获取锁、释放锁。需要程序员在脑中、逻辑上来自己维护。
- Go 会锁上一个 Object 的所有变量还是部分?和上个问题一样,Go 不知道任何锁与变量之间的关系。Lock 本身的源语很简单,goroutine0 调用 mu.Lock 时,没有其他 goroutine 持有锁,则 goroutine0 获取锁;如果其他 goroutine 持有锁,则一直等待直到其释放锁;当然,在某些语言,如 Java 里,会将对象或者实例等与锁绑定,以指明锁的作用域。
- Lock 应该是某个对象的私有变量?如果可以的话,最好这样做。但如果由跨对象的加锁需求,就需要拿出来了,但要注意避免死锁。
线程协调(Coordination)
- channels:go 中比较推荐的方式,分阻塞和带缓冲。
- sync.Cond:信号机制。
- waitGroup:阻塞知道一组 goroutine 执行完毕,后面还会提到。
死锁(DeadLock)
产生条件:多个锁,循环依赖,占有并等待。
如果你的程序不干活了,但是又没死,那你就需要看看是否死锁了。
爬虫(Web crawler)
- 从一个种子网页 URL 开始
- 通过 HTTP 请求,获取其内容文本
- 解析其内容包含的所有 URL,针对所有 URL 重复过程 2,3
为了避免重复抓取,需要记下所有抓取过的 URL。
由于:
网页数量巨大
-
网络请求较慢
一个接一个的抓取用时太长,因此需要并行抓取。这里面有个难点,就是如何判断已经抓取完所有网页,并需要结束抓取。
抓取代码
代码在https://pdos.csail.mit.edu/6.824/notes/crawler.go中有。
串行爬取。深度优先遍历(DFS )全部网页构成的图结构,利用一个名为 fetched 的 set 来保存所有已经抓取过的 URL。
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}
并行爬取
- 将抓取部分使用 go 关键字变为并行。但如果仅这么改造,不利用某些手段(sync.WaitGroup)等待子 goroutine,而直接返回,那么可能只会抓取到种子 URL,同时造成子 goroutine 的泄露。
- 如果访问已经抓取的 URL 集合 fetched 不加锁,很可能造成多次拉取同一个网页。
WaitGroup
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u) // u 被拷贝
}
done.Wait()
WaitGroup 内部维护了一个计数器:调用 wg.Add(n)
时候会增加 n;调用 wait.Done()
时候会减少 1。这时候调用 wg.Wait()
会一直阻塞直到当计数器变为 0 。所以 WaitGroup 很适合等待一组 goroutine 都结束的场景。
Q&A
如果 goroutine 异常退出没有调用
wg.Done()
怎么办?可以使用 defer 将其写在 goroutine 开始:defer wg.Done()
两个 goroutine 同时调用
wg.Done()
会有竞争(race),以至于内部计数器不能正确减少两次吗?WaitGroup 应该有相应机制(锁什么的)来保证 Done() 的原子性。-
定义匿名函数时,匿名函数中变量和外层函数同名变量间的关系?这是个闭包(closure)问题。如果匿名函数中变量没有被参数覆盖(如上述代码中 fetcher),就会和外层同名变量引用同一个地址。如果通过传参传递(如上述代码中 u),哪怕参数和外层变量看起来一样,但匿名函数使用的也是传进来的参数,而非外层变量;尤其针对 for 循环变量,我们通常通过参数来将其在调用时拷贝一次,否则 for 循环启动的所有 goroutine 都会指向这个不断被 for 循环赋值改变的变量。
对于闭包,go 中有个“变量逃逸”(Variable Escape)的说法,如果某个变量在函数声明周期结束时仍被引用,则将其分被到堆而非函数栈上。对闭包来说,某个变量同时被内层和外层函数引用,则其会被分配到堆上。
既然字符串 u 是不可变(immutable)的,为什么所有 goroutine 还会引用到不断变化的值?string 的确是不可变的,但是 u 的值一直在变,而 goroutine 和外层 goroutine 共享 u 的引用。
去掉锁
如果在更新 map 的时候去掉锁,运行几次发现并没有什么异常,因为 race 其实很难检测。好在 go 提供了竞态分析工具帮你来找到潜在含有竞态的地方:go run -race crawler.go
注意该工具没有做静态分析,而是在动态执行过程中观察、记录各个 goroutine 的执行轨迹,进行分析。
线程数量
Q&A
- 该代码在整个运行中会同时多少线程在运行(goroutine)?
该代码并没有做明显的限制,但是其明显和 URL 数量、抓取时间正相关。例子中输入只有五个 URL,因此没有什么问题。但在现实中,这么做可能会同时启动上百万个 goroutine。因此一个改进是,实现启动一个固定数量的 worker 池子,每个 worker 干完后就去要 / 被分配下一个任务。
使用 channel 通信
我们可以实现一个新的爬虫版本,不用锁 + 共享变量,而用 go 中内置的语法:channel 来通信。具体做法类似实现一个生产者消费者模型,使用 channel 做消息队列。
- 初始将种子 url 塞进 channel。
- 消费者:master 不断从 channel 中取出 urls,判断是否抓取过,然后启动新的 worker goroutine 去抓取。
- 生产者:worker goroutine 抓取到给定的任务 url,并将解析出的结果 urls 塞回 channel。
- master 使用一个变量 n 来追踪发出的任务数;往发出一份任务增加一;从 channel 中获取并处理完一份结果(即将其再安排给 worker)减掉一;当所有任务都处理完时,退出程序。
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}
func master(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}
func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
master(ch, fetcher)
}
Q&A:
- master 读 channel,多 worker 写 channel,不会有竞争问题吗?channel 是线程安全的。
- channel 不需要最后 close 吗?我们用 n 追踪了所有执行中的任务数,因此当 n 为 0 退出时,channel 中不存在任何任务 / 结果,因此 master/worker 都不会对 channel 存在引用,稍后 gc collector 会将其回收。
- 为什么在 ConcurrentChannel 需要用 goroutine 往 channel 中写一个 url?否则 master 在读取的时候会一直阻塞。并且 channel 是一个非缓冲 channel,如果不用 goroutine,将会永远阻塞在写的时候。