go 并发协程管理 - 使用 Context 和通道做协程管理

Go 语言提供了 Context 标准库是为了解决复杂的并发场景下,对协程有更好的控制。Context 的作用和它的名字很像,上下文,即子协程的下上文。Context 有两个主要的功能:

  • 通知子协程退出(正常退出,超时退出等);
  • 传递必要的参数 ;

Context

context.WithCancel

context.WithCancel() 创建可取消的 Context 对象,即可以主动通知子协程退出。
使用 Context 改写上述的例子,效果与 select+chan 相同。

func doTask(ctx context.Context, job string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop", job)
            return
        default:
            fmt.Println(job, "send request")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go doTask(ctx, "task1")
    time.Sleep(2 * time.Second)
    cancel()
}
  • context.Backgroud() 创建根 Context,通常在 main 函数、初始化和测试代码中创建,作为顶层 Context。
  • context.WithCancel(parent) 创建可取消的子 Context,同时返回函数 cancel。
  • 在子协程中,使用 select 调用 <-ctx.Done() 判断是否需要退出。
  • 主协程中,调用 cancel() 函数通知子协程退出。

context.WithValue

如果需要往子协程中传递参数,可以使用 context.WithValue()。

type Options struct{ Interval time.Duration }

func doTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop", name)
            return
        default:
            fmt.Println(name, "send request")
            op := ctx.Value("options").(*Options)
            time.Sleep(op.Interval * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    vCtx := context.WithValue(ctx, "options", &Options{1})

    go doTask(vCtx, "task1")
    go doTask(vCtx, "task2")

    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(3 * time.Second)
}
  • context.WithValue() 创建了一个基于 ctx 的子 Context,并携带了值 options。
  • 在子协程中,使用 ctx.Value("options") 获取到传递的值,读取/修改该值。

context.WithTimeout

如果需要控制子协程的执行时间,可以使用 context.WithTimeout 创建具有超时通知机制的 Context 对象。

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    go doTask(ctx, "task1")
    go doTask(ctx, "task2")

    time.Sleep(3 * time.Second)
    fmt.Println("before cancel")
    cancel()
    time.Sleep(3 * time.Second)
}

WithTimeout()的使用与 WithCancel() 类似,多了一个参数,用于设置超时时间。

因为超时时间设置为 2s,但是 main 函数中,3s 后才会调用 cancel(),因此,在调用 cancel() 函数前,子协程因为超时已经退出了。

context.WithDeadline

超时退出可以控制子协程的最长执行时间,那 context.WithDeadline() 则可以控制子协程的最迟退出时间。

func doTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop", name, ctx.Err())
            return
        default:
            fmt.Println(name, "send request")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
    go doTask(ctx, "task1")
    go doTask(ctx, "task2")

    time.Sleep(3 * time.Second)
    fmt.Println("before cancel")
    cancel()
    time.Sleep(3 * time.Second)
}

  • WithDeadline 用于设置截止时间。在这个例子中,将截止时间设置为1s后,cancel() 函数在 3s 后调用,因此子协程将在调用 cancel() 函数前结束。
  • 在子协程中,可以通过 ctx.Err() 获取到子协程退出的错误原因。

可以看到,子协程 task1 和 task2 均是因为截止时间到了而退出。

通道

我们先看 chan 的实例


//chan 同步通道 (无缓存通道)
func ChanNoCache() {
    ch := make(chan int, 0)

    go func() {
        var sum int = 0
        for i :=0; i<10; i++ {
            sum = sum + i
        }
        ch <- sum
    }()
    //在计算sum和的goroutine没有执行完,把值赋给ch通道之前,
    //fmt.Println(<-ch)会一直等待
    log.Println(<-ch)

}

//chan 通道 (有缓存)
func ChanWithCache()  string {
    response := make(chan string, 3)

    go func() { response <- http.Request("https://godoc.org/google.golang.org/grpc") }()
    go func() { response <- http.Request("https://godoc.org/debug/gosym") }()
    go func() { response <- http.Request("https://godoc.org/context") }()

    //输出所有数据
    for i:=0 ; i< cap(response); i++ {
        log.Println(<-response)
        log.Println("----------", i)
    }

    //返回最快的获取到数据
    return <- response
}

//pipeline 管道
func Pipeline() {
    begin := make(chan int, 0)
    end := make(chan int, 0)

    go func() {
        begin <- 10
    }()


    go func() {
        num := <- begin
        end <- num
    }()

    log.Println(<-end)

}


  • 通道 (同步和缓存)
  • 管道 (生产者和消费者)

在多个goroutine并发中,我们不仅可以通过原子函数和互斥锁保证对共享资源的安全访问,消除竞争的状态,还可以通过使用通道,在多个goroutine发送和接受共享的数据,达到数据同步的目的。

通道,他有点像在两个routine之间架设的管道,一个goroutine可以往这个管道里塞数据,另外一个可以从这个管道里取数据,有点类似于我们说的队列。

通道类型和Map这些类型一样,可以使用内置的make函数声明初始化,这里我们初始化了一个chan int类型的通道,所以我们只能往这个通道里发送int类型的数据,当然接收也只能是int类型的数据。

管道: 把上一个操作的输出,当成下一个操作的输入,连起来,做一连串的处理操作。我们把一个通道的输出,当成下一个通道的输入也能达到管道的效果 。

通道 + Context 任务管理

package main

import (
    "context"
    "strconv"
    "fmt"
)


func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    i := 0
    for {
        generateJob(ctx, "itask" + strconv.Itoa(i))
        i++
        if i> 10000 {
            break
        }
    }
}

//生成 job
func generateJob(parent context.Context, value string) {
    ch := make(chan int, 0)
    ctx, cancel := context.WithCancel(parent)
    go doTask(ch, ctx, value)
    <-ch
    cancel()
}


//执行任务
func doTask(ch chan<- int, ctx context.Context, job string) {
    select {
    case <-ctx.Done():
        fmt.Println("job is closed", job)
        return
    default:
        fmt.Println(job, "is running")
        ch <- 1
    }
}


  • 用通道来控制协程执行的状态 "ch <- 1", 当 <-ch 接受完传值后任务即结束 ;
  • 这里用了 context.Background() 作为父会话, 然后在子协程中调用 cancel() 结束;

任务调度

//生成 jobs
func GenerateJobs(pch chan<- context.Context, parent context.Context) {
    //设置 context
    ctx, cancel := context.WithCancel(parent)
    //设置通道
    ch := make(chan context.Context, 0)
    //任务调度方法
    go doTaskScheduler(...)
    //等任务执行完, 接受信号
    pctx := <-ch
    pch <- pctx
    //销毁资源
    for {
        select {
            case <-ctx.Done():
                recycle() //回收调度资源
                cancel()
                return
            default:
                break
        }
    }
}

任务调度 doTaskScheduler(...) 可以实现任务在不同的节点上执行,可以对节点进行资源计算等等,完成调度任务。等任务执行完成会返回结束新号,告诉当前协程,子协程已经结束,并对资源进行回收。

参考

原文: 深入解析 Go Context 与 协程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容