Java 多线程
基础
进程和线程
- 进程和线程的区别于联系
- 进程是操作系统分配资源的基本单位,进程拥有独立的内存等资源,一个进程至少有一个线程。
- 线程则是具体的执行单位,CPU(或是CPU的一个核心)同时只能执行一个线程
- 一个进程的所有线程共享该进程的系统资源。
- 进程是程序的一次执行,而使用线程可以使程序的这次执行变为并发
线程的状态
Java中线程工分为以下几个状态:
- new 表示线程创建,但是还未执行
- runnable 表示线程可以被cpu调度到
- blocked 表示线程进入同步代码块或IO操作
- running 表示线程正在执行
- waiting 表示线程正在等待一个monitor
- timed_waiting表示线程有显时间内等待
各个状态之间切换关系如下:
Java内存模型
Java内存模型是Java多线程的基础,可以参考文章:全面理解Java内存模型
Therad类介绍
Therad类的使用
继承Thread类并重写Run方法
public static void main(String[] args) {
UselessThread thread = new UselessThread();
thread.start();
}
static class UselessThread extends Thread {
@Override
public void run() {
System.out.print("Useless");
}
}
输出结果:
Useless
使用Runable接口
Thread thread = new Thread(new Runnable() {
public void run() {
System.out.print("Useless");
}
});
thread.start();
输出结果:
Useless
几个常用接口
- sleep
sleep方法是Thread中的一个静态方法,用于用于强制停止当前的线程指向若干毫秒。
- yield
yield 方法调用后表示线程可以让出CPU,系统可以将CPU调度其他线程执行,也可以继续执行当前线程。
- join
join方法表示阻塞调用该方法的线程,指导被引用的线程结束为止,如下面的例子:
Thread aThread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("aThread finished \r\n");
}
});
aThread.start();
System.out.print("main Threand is abount to join \r\n");
try {
aThread.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.print("main Threand finish join \r\n");
输出结果:
main Threand is abount to join
aThread finished
main Threand finish join
- interupt
interupt方法使被引用的线程收到一个InteruptException,以防止被引用的线程无限等待么,如下面的例子:
Thread aThread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(5000000);
} catch (InterruptedException e) {
System.out.print("aThread is interrupt \r\n");
}
System.out.print("aThread finished \r\n");
}
});
aThread.start();
try {
Thread.sleep(2000);
System.out.print("main thread sleep ended \r\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
aThread.interrupt();
输出结果:
main thread sleep ended
aThread is interrupt
aThread finished
- setPriority
setPriority方法用于设置线程的优先级,Thread类中优先级是一个从1到10的整形,数字越大,优先级越高。
基础同步工具
synchronized
synchronized关键字可以用在方法代码块中。在Java中每个对象都有一个唯一的锁,要进synchronized代码块&方法中后必须获得相关的锁,如果无法获得则线程阻塞直到持有锁代码块&方法退出为止。如下面的例子:
final Object lock = new Object();
Thread aThread = new Thread(new Runnable() {
public void run() {
System.out.print("aThread is runing \r\n");
synchronized (lock) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("aThread finished \r\n");
}
}
});
aThread.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
System.out.print("main Thread finished \r\n");
}
输出结果如下:
aThread is runing
aThread finished
main Thread finished
从输出结果可以看出主线程被阻塞,直到子线程带锁的代码块返回。
wait & notify & notifyAll
wait接口调用后,调用后会放弃对象的锁,并且当前线程进入了阻塞状态,并进入到一个和该对象相关的等待池中。notfyfy & notify_all接口用于唤醒等待池等待池中第一个/所有的线程。wait方法和notify/notifyAll方法都必须包含在synchronized代码块中。下面是一个例子:
public class TestJava {
public static Object sObject = new Object();
/**
* @param args
*/
public static void main(String[] args) {
WaitThread threadA = new WaitThread("ThreadA");
WaitThread threadB = new WaitThread("ThreadB");
WaitThread threadC = new WaitThread("ThreadC");
threadA.start();
threadB.start();
threadC.start();
synchronized (sObject) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
sObject.notify();
}
}
static class WaitThread extends Thread {
WaitThread(String name) {
super(name);
}
@Override
public void run() {
synchronized (sObject) {
System.out.print(getName() + " is abount to wait \r\n");
try {
sObject.wait();
} catch (InterruptedException e) {
// TODO: handle exception
}
System.out.print("Thread" + getName() + " end wait \r\n");
}
}
}
}
输出结果如下:
ThreadA is abount to wait
ThreadC is abount to wait
ThreadB is abount to wait
ThreadA end wait
如果将sObject.notify()换成sObject.notifyAll() 输出结果如下:
ThreadA is abount to wait
ThreadB is abount to wait
ThreadC is abount to wait
main thread is about to notify
ThreadC end wait
ThreadB end wait
ThreadA end wait
volatile
volatile作用如下
- Java提供了volatile关键字来保证可见性。当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。
- 在Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
- 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
- 禁止进行指令重排序
- volatile不可保证原子性
使用volatile的场景如下
- 多线程下,保证对当前变量的写操作保证可见性
voaltile boolean isStop = true;
//Thread1
while (isStop){
dosometing();
}
//Thread2
isStop = true;
终止一个无限循环的线程可以通过标记变量的形式来做,如果不使用volatile关键字线程2的修改可能修改对线程1不可见,会导致线程1继续无限循环。
- 多线程下,禁止指令重排序
Context context;
volatile boolean isInitialized;
//Thread1
context = loadContext();
isInitialized = true;
//Thread2
if (isInitialized) {
context.getRessources();
}
如果不适用volatile 线程1中的两行代码可能被重排序,从而导致线程2概率性出现空指针问题。
高级多线程工具类
ThreadLocal
ThreadLocal的使用方法
ThreadLocal类为对于为同一对象在每个线程产生一个副本,从而避免资源的竞争,简化同步代码。ThreadLocal 共有三个共有方法。
public static void main(String[] args) {
final ThreadLocal<String> description = new ThreadLocal<String>() {
@Override
protected String initialValue() {
return "Not initialized";
}
};
Runnable r = new Runnable() {
public void run() {
String threadName = Thread.currentThread().getName();
System.out.print("Thread:" + threadName + " before set description:" + description.get() + "\r\n");
description.set(threadName);
System.out.print("Thread:" + threadName + " after set description:" + description.get() + "\r\n");
}
};
Thread t1 = new Thread(r, "A");
Thread t2 = new Thread(r, "B");
Thread t3 = new Thread(r, "C");
t1.start();
t2.start();
t3.start();
}
输出结果如下:
Thread:A before set description:Not initialized
Thread:C before set description:Not initialized
Thread:A after set description:A
Thread:B before set description:Not initialized
Thread:C after set description:C
Thread:B after set description:B
从输出结果看三个线程,交替执行 description.get()的值都不受其他线程修改的影响。
ThreadLocal实现分析
- Thread类中有一个ThreadLocalMap成员,ThreadLocalMap是一个ThreadLocal和Object的Map
//Thread.java
ThreadLocal.ThreadLocalMap threadLocals = null;
//ThreadLocal.java
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
- ThreadLocal 的Set 和Get放到都会从当前Thread的threadLocals中查找。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
原子类
原子类AtomicInteger 中提供了一系列的方法,可以保证Integer的相关操作时原子性的,下面观察两段代码:
- volatile int变量自增
public volatile int inc = 0;
public void increase() {
inc++;
}
public static void main(String[] args) {
final TestJava test = new TestJava();
for (int i = 0; i < 1000; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increase();
};
}.start();
}
while (Thread.activeCount() > 1)
Thread.yield();
System.out.println(test.inc);
}
1000 个线程中,每个线程对int变量做1000次自增,输出结果如下:
995441
如前面所述,volatile变量只能保证修改可见,自增操作可以分解为 读取,增加,写入三个操作,当线程A和B的执行顺序为,A读取,B读取,A增加,B增加,B写入,A写入时,就会出现结果不一致的问题。
如果修改为AtomicInteger,测试结果如下:
public AtomicInteger atomInt = new AtomicInteger(0);
public void increaseAtom() {
atomInt.getAndIncrement();
}
public static void main(String[] args) {
final TestJava test = new TestJava();
for (int i = 0; i < 1000; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increaseAtom();
};
}.start();
}
while (Thread.activeCount() > 1)
Thread.yield();
System.out.println(test.atomInt.get());
}
1000000
从测试结果看,AtomicInteger的getAndIncrement方法保证了自增操作的原子性。
AtomicInteger的实现
AtomicInteger中getAndIncrement方法的实现如下:
//AtomicInteger.java
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
//Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
会循环读取 当前值,然后再调用compareAndSwapInt方法设置新的值。这里要提到的一个概念是CAS, CAS是compare-and-swap的缩写,当要写入一个值时,首先从主存中读取当前值,然后和其他的当前值做比较,如果发现不同则认为其他线程修改了这个值,则写入失败。目前很多CPU都支持CAS指令因此为了效率 compareAndSwapInt 和 getIntVolatile都使用了natvie实现。
Lock类
JDK1.5 之后JAVA提供了Lock接口来实现和synchronized一样的功能,使用方法如下.
public static void main(String[] args) {
final Lock lock = new ReentrantLock();
Runnable r = new Runnable() {
public void run() {
try {
System.out.print("Thread:" + Thread.currentThread().getName()
+ " requect lock \r\n");
lock.lock();
long before = System.currentTimeMillis();
long sleepTime = Math.abs(new Random().nextLong() & 2000);
System.out.print("Thread:" + Thread.currentThread().getName()
+ " getLock sleep:" + sleepTime + " \r\n");
Thread.sleep(sleepTime);
long after = System.currentTimeMillis();
System.out.print("Thread:" + Thread.currentThread().getName()
+ " End Time consumed:" + (after - before) + "\r\n");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
Thread threadA = new Thread(r, "A");
Thread threadB = new Thread(r, "B");
Thread threadC = new Thread(r, "C");
threadA.start();
threadB.start();
threadC.start();
}
Thread:C request lock
Thread:B request lock
Thread:A request lock
Thread:C getLock sleep:1680
Thread:C End Time consumed:1681
Thread:A getLock sleep:896
Thread:A End Time consumed:896
Thread:B getLock sleep:1232
Thread:B End Time consumed:1233
从Log可以看出,三个线程在锁之间的代码必须等待其他线程释放锁之后才能执行。
如果将锁的声明修改为:
final Lock lock = new ReentrantLock(true);
可以看到如下log:
Thread:A request lock
Thread:C request lock
Thread:B request lock
Thread:A getLock sleep:336
Thread:A End Time consumed:337
Thread:C getLock sleep:1808
Thread:C End Time consumed:1808
Thread:B getLock sleep:320
Thread:B End Time consumed:321
从输出可以看出,获得锁的顺序和申请锁的一致,这种锁称为公平锁。ReentrantLock类可以构造函数可以接受一个Boolean类型的标量用于定义锁是否是公平锁。默认工构造函数为非公平锁。
Condition类
Condition类实现了和Object类wait以及notify类似的功能,参考代码如下"
public static void main(String[] args) {
final Lock lock = new ReentrantLock();
ConditionRunable ra = new ConditionRunable(lock);
ConditionRunable rb = new ConditionRunable(lock);
ConditionRunable rc = new ConditionRunable(lock);
Thread threadA = new Thread(ra, "A");
Thread threadB = new Thread(rb, "B");
Thread threadC = new Thread(rc, "C");
threadA.start();
threadB.start();
threadC.start();
try {
Thread.sleep(1000);
lock.lock();
rb.mCondition.signal();
rc.mCondition.signal();
ra.mCondition.signal();
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
static class ConditionRunable implements Runnable {
public Condition mCondition;
private Lock mLock;
public ConditionRunable(Lock lock) {
mLock = lock;
mCondition = lock.newCondition();
}
public void run() {
try {
mLock.lock();
System.out.print("Thread:" + Thread.currentThread().getName()
+ " reeady to wait \r\n");
mCondition.await();
System.out.print("Thread:" + Thread.currentThread().getName() + " wait end \r\n");
} catch (InterruptedException e) {
} finally {
mLock.unlock();
}
}
}
输出如下:
Thread:A reeady to wait
Thread:B reeady to wait
Thread:C reeady to wait
Thread:B wait end
Thread:C wait end
Thread:A wait end
Condition的await和signal 和 Object的wait和notify类似,必须包含在锁的lock和unlock方法之间。但是一个lock可以创建多个condition,从而在实际用用中,可以根据需求唤醒不同的线程。
ThreadPoolExecutor
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数比较复杂,说明如下:
- 当线程池中的线程小于corePoolSize时,当提交新的任务新建一个线程执行
- 当线程池中的线程达到corePoolSize,新提交的任务被放入workQueue等待执行
- 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交的任务会创建新线程执行
- 当提交的任务数超过maximumPoolSize时,执行RejectedExecutionHandler
- 当线程数量超过corePoolSize,且空闲时间达到keepAliveTime,则终止超出corePoolSize的空闲线程
- 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
Android AsynTask中也用到了ThreadPoolExecutor,代码如下:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
这里corePoolSize的值,如果CPU核心数大于4,则为4,如果CPU核心数小于等于2则为2。maximumPoolSize的大小为CPU核心数乘以2加1。wockQueue的大小为128。
使用方法
可以参考下面的例子:
static BlockingQueue<Runnable> sBlockingQueue = new LinkedBlockingDeque<Runnable>(4);
static ThreadPoolExecutor sExcutor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.SECONDS,
sBlockingQueue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
System.out.print("Task was rejected +\r\n");
}
});
/**
* @param args
*/
public static void main(String[] args) {
Runnable r = new Runnable() {
public void run() {
System.out.print("A Task is runing \r\n");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
}
System.out.print("A Task is end \r\n");
}
};
for (int i = 0; i < 2; i++) {
sExcutor.execute(r);
}
}
有兴趣的同学可以将循环的次数分别修改为2,6,9,10看下执行的结果。