Android Kotlin(8)之《协程3》

Android Kotlin第八篇 协程3。Kotlin系列源码在源码下载这里下载。我们一起来了解下Kotlin的协程,协程也是Kotlin重点。也许我有的地方没有写好,也欢迎大家提出问题,纠正问题。

前面我们已经简单了解了协程,这篇我们来看下协程的调度线程与Channels(通道)

一、调度线程

在前面我们经常用到“CommonPool”共享的线程池,那么除了共享的线程池以为还有哪些呢,如下:

fun test21() = runBlocking<Unit> {
        val jobs = arrayListOf<Job>()
        jobs += launch(Unconfined) { // not confined -- will work with main thread
            log("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
            log("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
            log("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
            log("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs.forEach { it.join() }
    }
  • Unconfined:无限制的,自由的,不限于任何特定线程
  • coroutineContext:返回这个coroutine的上下文。
  • CommonPool:共享线程的公共池,类似java里线程池,只不过这里是公共的
  • newSingleThreadContext:创建一个单独的线程,在作业被取消时回收
    目前线程调度还未完全搞懂,只了解了部分,不敢妄加猜测,调度线程就先了解到这里,后续我在补充

二、Channels(通道)

Channels,它不是一个阻止放操作,而是一个挂起发送,而不是一个阻塞操作,它有一个暂停接收。可用于线程间传递数据。
我们先来看一个简单的示例:

fun test22() = runBlocking<Unit> {
        //定义一个通道
        val channel = Channel<Int>()
        launch(CommonPool) {
            for (x in 1..5) {
                delay(1000L)
                //在这里发送
                channel.send(x * x)
            }
        }
        repeat(5) {
            //在这里接收
            log(channel.receive().toString())
        }
        log("Done!")
    }

输出:

08-10 14:48:52.452 10568-10568/com.xiaoqiang.kotlin I/test: 1
08-10 14:48:53.452 10568-10568/com.xiaoqiang.kotlin I/test: 4
08-10 14:48:54.452 10568-10568/com.xiaoqiang.kotlin I/test: 9
08-10 14:48:55.452 10568-10568/com.xiaoqiang.kotlin I/test: 16
08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: 25
08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: Done!
08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: 结束

1、关闭通道

假设发送的时候,遇到特殊情况,不在发生了,要关闭通道,怎么做呢,例如:

fun test23() = runBlocking<Unit> {
        //定义一个通道
        val channel = Channel<Int>()
        launch(CommonPool) {
            for (x in 1..5) {
                delay(1000L)
                //在这里发送
                channel.send(x * x)
                if(x == 4){
                    log("关闭通道")
                    channel.close()
                }
            }
        }
        repeat(5) {
            //在这里接收
            try {
                var a = channel.receive()
                log(a.toString())
            }catch (e: ClosedReceiveChannelException){
                log("关闭通道报出异常ClosedReceiveChannelException")
            }
            log("等待接收")
        }
        log("Done!")
    }

注意:我实际测试的时候发现,如果不抛出ClosedReceiveChannelException异常,那么会导致程序直接奔溃,所有这里你只需要在接收方抛出异常即可
我们也可把通道写成方法,实际测试发现方法里结束通道时在接收方可以不写抛出异常,例如:

fun produceSquares() = produce<Int>(CommonPool) {
        for (x in 1..5) {
            delay(1000L)
            send(x * x)
            if (x == 4){
                channel.close()
            }
        }
    }
    fun test24() = runBlocking<Unit> {
        val squares = produceSquares()
        squares.consumeEach { log(it.toString()) }
        log("Done!")
    }

2、Pipelines

在这里我理解为管道运输,也就是说上面produce生成通道是可以两个链接起来,每块做不同的工作,然后输出,例如:

fun produceSquares1() = produce<Int>(CommonPool) {
        for (x in 1..5) {
            delay(1000L)//这里延迟就失效了,具体原因不清楚为啥会失效
            send(x)
        }
    }
    fun produceSquares2(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
        for (x in 1..5) {
            delay(1000L)
            send((x * x)+1)
        }
    }
    fun test25() = runBlocking<Unit> {
        //始发地
        val numbers = produceSquares1()
        //通道二次加工
        val squares = produceSquares2(numbers)
        //最后输出
        squares.consumeEach { log(it.toString()) }
        log("Done!")
        //关闭通道,回收
        numbers.cancel()
        squares.cancel()
    }

输出:

08-10 14:56:31.742 19083-19083/com.xiaoqiang.kotlin I/test: 2
08-10 14:56:32.742 19083-19083/com.xiaoqiang.kotlin I/test: 5
08-10 14:56:33.742 19083-19083/com.xiaoqiang.kotlin I/test: 10
08-10 14:56:34.742 19083-19083/com.xiaoqiang.kotlin I/test: 17
08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: 26
08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: Done!
08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: 结束

3、Fan-out

多个coroutines 可以从一个通道接收,例如:

fun produceNumbers() = produce<Int>(CommonPool) {
        var a = 1
        while (true){
            delay(100L)
            send(a++)
        }
    }
    fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
        channel.consumeEach {
            log("launch#$id:收到: $it")
        }
    }
    fun test26() = runBlocking<Unit> {
        val producer = produceNumbers()
        repeat(5) { launchProcessor(it, producer) }
        delay(950)
        producer.cancel() // cancel producer coroutine and thus kill them all
    }

输出:

08-10 15:01:07.228 24523-24563/? I/test: launch#0:收到: 1
08-10 15:01:07.338 24523-24563/? I/test: launch#1:收到: 2
08-10 15:01:07.438 24523-24562/? I/test: launch#2:收到: 3
08-10 15:01:07.538 24523-24563/? I/test: launch#4:收到: 4
08-10 15:01:07.638 24523-24562/? I/test: launch#3:收到: 5
08-10 15:01:07.738 24523-24563/? I/test: launch#0:收到: 6
08-10 15:01:07.838 24523-24563/? I/test: launch#1:收到: 7
08-10 15:01:07.938 24523-24562/? I/test: launch#2:收到: 8
08-10 15:01:08.038 24523-24563/com.xiaoqiang.kotlin I/test: launch#4:收到: 9
08-10 15:01:08.088 24523-24523/com.xiaoqiang.kotlin I/test: 结束

4、Fan-in

多个coroutines 可以发送到一个通道,例如

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
        while (true) {
            delay(time)
            channel.send(s)
        }
    }
    fun test27() = runBlocking<Unit> {
        val channel = Channel<String>()
        //这里两个launch间隔着向通道发送数据
        launch(coroutineContext) { sendString(channel, "foo", 200L) }
        launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
        repeat(6) {
            //循环接收数据
            log(channel.receive())
        }
    }

输出:

08-10 14:58:46.088 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.288 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.388 21748-21748/com.xiaoqiang.kotlin I/test: BAR!
08-10 14:58:46.488 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.688 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.888 21748-21748/com.xiaoqiang.kotlin I/test: BAR!
08-10 14:58:46.888 21748-21748/com.xiaoqiang.kotlin I/test: 结束

5、通道缓冲区

到目前为止所显示的通道没有缓冲区。当发送方和接收方相遇(又称会合)时,未缓冲通道传输元素。如果首先调用send,那么它将被挂起,直到被调用接收,如果首先调用接收,它将暂停,直到调用send。例如:

fun test28() = runBlocking<Unit> {
        //创建缓冲区大小为4的通道
        val channel = Channel<Int>(2)
        launch(CommonPool) {
            repeat(5) {
                log("Sending $it")
                channel.send(it) //当通道缓冲区满的时候挂起,直到下方接收的时候在继续
                delay(100L)
            }
        }
        delay(3000)
        log("开始接收")
        launch(CommonPool) {
            repeat(5) {
                val a= channel.receive()//开始接收通道缓冲区里数据,上方发送被激活,继续发送
                log("Receive $a")
            }
        }
    }

输出:

08-10 18:29:07.920 21685-21744/com.xiaoqiang.kotlin I/test: Sending 0
08-10 18:29:08.020 21685-21744/com.xiaoqiang.kotlin I/test: Sending 1
08-10 18:29:08.130 21685-21744/com.xiaoqiang.kotlin I/test: Sending 2
08-10 18:29:10.920 21685-21685/com.xiaoqiang.kotlin I/test: 开始接收
08-10 18:29:10.920 21685-21685/com.xiaoqiang.kotlin I/test: 结束
08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 0
08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 1
08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 2
08-10 18:29:11.030 21685-21744/com.xiaoqiang.kotlin I/test: Sending 3
08-10 18:29:11.030 21685-21745/com.xiaoqiang.kotlin I/test: Receive 3
08-10 18:29:11.130 21685-21744/com.xiaoqiang.kotlin I/test: Sending 4
08-10 18:29:11.130 21685-21745/com.xiaoqiang.kotlin I/test: Receive 4

如果我有未写好的地方,欢迎大家提出意见,谢谢大家的观赏!

全套源码下载这里源码会随着后面发布的Kotlin逐渐完善

a

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,056评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,842评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,938评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,296评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,292评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,413评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,824评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,493评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,686评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,502评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,553评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,281评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,820评论 3 305
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,873评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,109评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,699评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,257评论 2 341

推荐阅读更多精彩内容

  • 原文链接:https://github.com/EasyKotlin 在常用的并发模型中,多进程、多线程、分布式是...
    JackChen1024阅读 10,695评论 3 23
  • 前言 人生苦多,快来 Kotlin ,快速学习Kotlin! 什么是Kotlin? Kotlin 是种静态类型编程...
    任半生嚣狂阅读 26,133评论 9 118
  • 本文主要介绍协程的用法, 以及使用协程能带来什么好处. 另外, 也会粗略提一下协程的大致原理.本文的意义可能仅仅是...
    登高而望远阅读 35,159评论 18 140
  • 相较于奶咖,我更喜欢动手冲点单品咖啡来喝,目前偏爱酸度高的咖啡豆。自己动手,可以参与整个过程,磨豆子时研磨出的香气...
    ManBetter阅读 647评论 0 0
  • 微雨淅沥,潺潺入思。 远处雷音微鸣,眼前嫩叶细语。我享受这安静的午后。 蜘蛛从房檐下探出头来,清理挂着雨雾的蛛网。...
    张博译阅读 337评论 0 2