线程是内核态 控制的 重量级
协程是用户态 控制的 轻量级
一个线程可以多个协程,一个进程也可以单独拥有多个协程。
线程进程都是同步机制,而协程则是异步。
协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。
线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。
线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。
agent 做了一个代理,在 调用 class Fiber 自动生成一个 栈,整个 Quasar 来管理 Fiber 之间的切换
代码实现:
-
1. pom 文件中 添加依赖
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.8.0</version>
</dependency>
-
2. idea 编译器中 添加 指定 quasar 参数
-javaagent:C:/Users/86185/.m2/repository/co/paralleluniverse/quasar-core/0.8.0/quasar-core-0.8.0.jar
-
3. 纤程测试
public class CoroutinesTest {
public static void main(String[] args) {
long start = System.currentTimeMillis();
for (int i = 0;i < 10000; i++){
Fiber<Void> fiber = new Fiber<>(new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
calcu();
}
});
fiber.start();
}
long end = System.currentTimeMillis();
System.out.println("计算 时间 : "+(end - start));
}
static void calcu(){
int result = 0;
for (int i = 0 ; i < 10000; i++){
for (int j = 0; j < 200; j++) result+=j;
}
}
}
测试结果:
-
4. 线程测试
public class CoroutinesTest {
public static void main(String[] args) {
long start = System.currentTimeMillis();
Runnable r = new Runnable() {
@Override
public void run() {
calcu();
}
};
for (int i = 0;i < 10000; i++){
Thread t = new Thread(r);
t.start();
}
long end = System.currentTimeMillis();
System.out.println("计算 时间 : "+(end - start));
}
static void calcu(){
int result = 0;
for (int i = 0 ; i < 10000; i++){
for (int j = 0; j < 200; j++) result+=j;
}
}
}
测试结果:
Fiber 工具类
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;
import java.util.concurrent.ArrayBlockingQueue;
public class WorkTools {
//协程池中默认协程的个数为5
private static int WORK_NUM = 5;
//队列默认任务为100
private static int TASK_COUNT = 100;
//工做协程数组
private Fiber[] workThreads;
//等待队列
private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;
//用户在构造这个协程池时,但愿启动的协程数
private final int workerNum;
//构造方法:建立具备默认协程个数的协程池
public WorkTools() {
this(WORK_NUM,TASK_COUNT);
}
//建立协程池,workNum为协程池中工做协程的个数
public WorkTools(int workerNum, int taskCount) {
if (workerNum <= 0) {
workerNum = WORK_NUM;
}
if (taskCount <= 0) {
taskCount = TASK_COUNT;
}
this.workerNum = workerNum;
taskQueue = new ArrayBlockingQueue(taskCount);
workThreads = new Fiber[workerNum];
for (int i = 0; i < workerNum; i++) {
int finalI = i;
workThreads[i] = new Fiber<>(new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
SuspendableRunnable runnable = null;
while (true){
try{
//取任务,没有则阻塞。
runnable = taskQueue.take();
}catch (Exception e){
System.out.println(e.getMessage());
}
//存在任务则运行。
if(runnable != null){
runnable.run();
}
runnable = null;
}
}
}); //new一个工做协程
workThreads[i].start(); //启动工做协程
}
Runtime.getRuntime().availableProcessors();
}
//执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
public void execute(SuspendableRunnable task) {
try {
taskQueue.put(task); //put:阻塞接口的插入
} catch (Exception e) {
// TODO: handle exception
System.out.println("阻塞");
}
}
//销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
public void destory() {
//工做协程中止工做,且置为null
System.out.println("ready close thread...");
for (int i = 0; i < workerNum; i++) {
workThreads[i] = null; //help gc
}
taskQueue.clear(); //清空等待队列
}
//覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
@Override
public String toString() {
return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
}
}
import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;
import java.util.concurrent.CountDownLatch;
public class AiApplication {
@SneakyThrows
public static void main(String[] args) {
//等待协程任务完毕后再结束主线程
CountDownLatch cdl = new CountDownLatch(50);
//开启5个协程,50个任务列队。
WorkTools myThreadPool = new WorkTools(5, 50);
for (int i = 0; i< 50; i++){
int finalI = i;
myThreadPool.execute(new SuspendableRunnable() {
@Override
public void run() {
System.out.println(finalI);
try {
//延迟1秒
Thread.sleep(1000);
cdl.countDown();
} catch (InterruptedException e) {
System.out.println("阻塞中");
}
}
});
}
//阻塞
cdl.await();
}
}