别名
- Actor
- Concurrent Object
适用的情况
actor是一个很抽象多线程模式, 每一个actor是线程独立并且有属于自己的状态, 多个actor互相发送消息以完成最终的任务. 你可以将actor模式理解为一个团队, 这个团队里面的个人就是一个运行在独立线程上的个体, 由他们互相交流并且单独处理自己的工作, 最后完成任务.
关于actor模式, 这里有一篇个人觉得很好的文章(英文)--The actor model in 10 minutes
实现的方式
构造一个ActiveObject类, 来充当actor角色, 有actor角色去完成任务, 并且完成交互.
相关的模式
- 实现actor角色会用到Worker Thread模式
- 将消息发送给actor时候,将消息放入队列, actor从队列中取消息进行处理, 用到了Producer-Consumer模式.
- 将处理结果返回给最终调用方需要用到Future模式.
代码示例:
如果不用java.util.concurrent.*, 那么将会使本模式的代码量很大, 阅读体验很差, 这里只考虑actor模式, 所以引入java.util.concurrent中已经实现的部分类.(完全自己实现的版本:GitHub连接--Actor)
代码中ActiveObject来进行处理任务, MakerClientThread发出生产字符串的消息交给activeObject进行生产,
DisplayClientThread发送打印字符串消息交给activeObject进行打印.
package com.graphic.activeObject.concurrent;
import java.util.concurrent.Future;
/**
* @author youngxinler 19-7-3 下午4:39
**/
public interface ActiveObject {
Future<String> makeString(int count, char fillChar);
void displayString(String string);
void shutdown();
}
package com.graphic.activeObject.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author youngxinler 19-7-3 下午4:43
**/
public class ActiveObjectImpl implements ActiveObject {
private final ExecutorService service = Executors.newSingleThreadExecutor();
@Override
public Future<String> makeString(final int count, final char fillChar) {
class MakeStringRequest implements Callable<String> {
public String call() {
char[] buffer = new char[count];
for (int i = 0; i < count; i++) {
buffer[i] = fillChar;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
return new String(buffer);
}
}
return service.submit(new MakeStringRequest());
}
@Override
public void displayString(final String string) {
class DisplayStringRequest implements Runnable {
@Override
public void run() {
try {
System.out.println("display: " + string);
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
service.submit(new DisplayStringRequest());
}
@Override
public void shutdown() {
service.shutdown();
}
}
package com.graphic.activeObject.concurrent;
/**
* @author youngxinler 19-7-3 下午4:43
**/
public class ActiveObjectFactory {
public static ActiveObject createActiveObject() {
return new ActiveObjectImpl();
}
}
package com.graphic.activeObject.concurrent;
import java.util.concurrent.Future;
/**
* @author youngxinler 19-7-3 下午4:38
**/
public class MakerClientThread extends Thread {
private final ActiveObject activeObject;
private final char fillChar;
public MakerClientThread(String s, ActiveObject activeObject) {
super(s);
this.activeObject = activeObject;
this.fillChar = s.charAt(0);
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
Future<String> future = activeObject.makeString(i, fillChar);
Thread.sleep(10);
String value = future.get();
System.out.println(Thread.currentThread().getName() + ": value = " + value);
}
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
}
}
}
package com.graphic.activeObject.concurrent;
import java.util.concurrent.CancellationException;
import java.util.concurrent.RejectedExecutionException;
/**
* @author youngxinler 19-7-3 下午6:23
**/
public class DisplayClientThread extends Thread {
private final ActiveObject activeObject;
public DisplayClientThread(String name, ActiveObject activeObject) {
super(name);
this.activeObject = activeObject;
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
String s = Thread.currentThread().getName() + " " + i;
activeObject.displayString(s);
Thread.sleep(200);
}
} catch (RejectedExecutionException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (CancellationException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
}
}
}
package com.graphic.activeObject.concurrent;
/**
* @author youngxinler 19-7-3 下午6:32
**/
public class Main {
public static void main(String[] args) {
ActiveObject activeObject = ActiveObjectFactory.createActiveObject();
try {
new MakerClientThread("alice", activeObject).start();
new MakerClientThread("bobby", activeObject).start();
new DisplayClientThread("chris", activeObject).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}