写在前面
因为并发相关的东西又多又长。。所以这个专题会分成多篇博客来写啦。。
本篇文章包括
- 携程机制,携程和线程的区别
- 使用锁来控制并发
- 使用通道(channel)来控制并发
- 通道的多路控制和超时(select语句块)
Go语言的并发操作(一)
1. Go语言的携程机制
-
线程和携程的区别(携程是更加轻量级的线程)
JDK5之后一个线程的默认栈道大小是5M,而携程栈的大小是2K
-
Java中线程和系统线程的对应关系是1:1,但是携程是n:m
线程由于涉及到处理器切换人物,会导致吞吐率下降。而使用携程,可以做到多个携程只会使用一个系统现场。利用这种方式降低切换线程上下文的成本
-
携程的简单使用
携程的使用非常简单,在需要运行的函数前面加上
go
关键字就可以了个人认为只需要把携程看成轻量级线程,使用时可以直接当成线程使用
package L13_concurrent import ( "fmt" "testing" "time" ) func TestGroutine(t *testing.T) { for i :=0 ; i<10 ; i++{ go func(s int) { fmt.Println(s) time.Sleep(time.Millisecond*100) }(i) } }
上面的程序输出结果如下(由于处理机调用的时间不一致,所以并不会出现顺序的数字)
=== RUN TestGroutine
4
7
5
6
1
8
9
0
2
3
--- PASS: TestGroutine (0.10s)
PASS
2. Go 语言的共享内存并发机制
-
使用锁来控制多线程程序
先看一个在没有锁的情况下使用携程(多线程)把一个值累加到5000点结果
package L13_concurrent import ( "fmt" "testing" ) func TestLock(t *testing.T) { counter := 0 for i := 0; i < 5000; i++ { go func() { counter++ }() } fmt.Println("counter = ",counter) }
结果如下,可以看见我们丢失了正确的鞋操作
counter = 4799
为了获得正确的结果,我们需要使用锁,和等待队列
package L13_concurrent import ( "fmt" "sync" "testing" ) func TestLock(t *testing.T) { //导入一个锁对象 var mut sync.Mutex //导入一个需要等待的携程数,用于主线程精准的等待进行累加的携程 var wg sync.WaitGroup counter := 0 for i := 0; i < 5000; i++ { //告诉等待队列需要等待的 wg.Add(1) go func() { //使用defer来保证这个锁一定会被释放 defer func() { mut.Unlock() }() //获得锁 mut.Lock() counter++ //告诉等待队列这个任务结束 wg.Done() }() } //等待所有携程结束 wg.Wait() fmt.Println("counter = ",counter) }
3. CSP的并发控制
CSP相当于是通过通道(发送消息)的机制来进行线程通讯。CSP的操作是通过一个Channel来完成的。也就是,这种方式会比使用直接通讯耦合度更低
-
Channel的交互机制
- 主动式交互:如果消息消费者离线,那么消息生产者就会阻塞
- 队列(缓存)式交互:在消息队列未满之前,消息的生产者可以一直放入消息,如果消息队列满了,那么消息生产者就会阻塞
-
CSP实现异步调用,延迟返回值
-
例如这里的Service方法是需要异步调用的
//这个方法做一些耗时的操作,需要异步调用, func service() string { time.Sleep(time.Second*1) return "do something" }
-
包装上面的方法,返回一个channel(注意代码里的chan类型的定义方式和把函数放入调用的方式)
//使用一个异步调用包装上面的方法 func asyncService() chan string { //创建一个string类型的频道 returnChannel := make(chan string) //异步调用 go func() { fmt.Println("开始异步调用") returnChannel <- service() fmt.Println("异步调用结束") }() return returnChannel }
-
这样调用
asyncService
方法的时候虽然可以很快的返回chan类型的数据,但是本质上任然是通过携程异步调用的,立即去获取chan里的值将会因为异步调用没有返回值而被阻塞我给出一个调用案例如下
//在程序入口的线程里也要做一些操作 func TestTask(t *testing.T) { //开始异步调用并且等待结果 result := asyncService() //开始做别的事情 time.Sleep(500*time.Millisecond) fmt.Println("do something else") //这里再等待结果 fmt.Println(<-result) }
结果如下
=== RUN TestTask
开始异步调用
do something else
异步调用结束
do something
--- PASS: TestTask (1.01s)
PASS -
创建一个带消息容量的channel是需要再make方法的参数后面加上一个int来表示消息队列的容量就好了,如下
returnChannel := make(chan string,1)
-
由于我对我的老本行——java后端的异步调用还不是很熟悉,那我就趁热打铁,写一个Demo实现Java的异步调用
这一部分是使用Java的Callable和FutureTask来完成异步调用,在获取调用结果的时候如果没有完成,主线程就会阻塞直到任务完成。如果不感兴趣可以直接跳到下一个大标题
-
使用Callable和ExecutorService线程池来完成任务
package com.libi.callable; import java.util.concurrent.*; /** * @author :Libi * @version :1.0 * @date :2019-06-17 19:39 * 用于了解Callable的使用方法 */ public class CallableDemo implements Callable<String> { /** * 这个方法是用于异步调用的 * @return * @throws Exception */ @Override public String call() throws Exception { System.out.println("do something in 5 seconds"); Thread.sleep(5000); return "完成,这个任务做完了"; } /** * 这个线程池用于调用callable任务,并且可以获得 */ private ExecutorService executorService = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws ExecutionException, InterruptedException { CallableDemo callableDemo = new CallableDemo(); System.out.println("开始异步调用"); //这一步可以封装成方法 Future<String> submit = callableDemo.executorService.submit(callableDemo); String s = submit.get(); System.out.println(s); callableDemo.executorService.shutdown(); } }
-
使用FutureTask的普通的Thread来完成任务
package com.libi.task; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * @author :Libi * @version :1.0 * @date :2019-06-17 19:51 * 这个类是研究FutureTask的使用方法而创建的类 */ public class TaskDemo { /** * 这种方法就没有用到线程池,但是也简单的实现类异步调用的 * @return */ private FutureTask<String> asyncService() { FutureTask<String> stringFutureTask = new FutureTask<String>(() -> { Thread.sleep(5000); return "任务完成"; }); new Thread(stringFutureTask).start(); return stringFutureTask; } public static void main(String[] args) throws ExecutionException, InterruptedException { TaskDemo demo = new TaskDemo(); System.out.println("开始异步调用"); FutureTask<String> stringFutureTask = demo.asyncService(); String s = stringFutureTask.get(); System.out.println("异步调用结束"); System.out.println(s); } }
4. 多路选择和超时(select块的使用)
select块就是用于多个异步调用的多路选择和超时控制。
select语句块类似switch,每一个case里都要使用一个频道(chan类型)来获得数据。只要有一个channel返回了数据,那么这个channel的语句块就会执行。如果都没有返回值,有default语句块的话就会执行default语句块
这里的channel应该提前启动好,当我们要获取结果时再去做相关处理
func TestSelect1(t *testing.T) {
s := asyncService()
time.Sleep(time.Millisecond*1000)
select {
//这一个语句块是为了做超时处理,10s后如果没有结果他就会返回结果
//(当然有了default语句块这个语句块也就没有意义了)
case <-time.After(10*time.Second):
print("10s")
case ret := <-s:
print("result:",ret)
default:
t.Error("error")
}
}