Java 多线程

Java 多线程

基础

进程和线程

  • 进程和线程的区别于联系
  1. 进程是操作系统分配资源的基本单位,进程拥有独立的内存等资源,一个进程至少有一个线程。
  2. 线程则是具体的执行单位,CPU(或是CPU的一个核心)同时只能执行一个线程
  3. 一个进程的所有线程共享该进程的系统资源。
  4. 进程是程序的一次执行,而使用线程可以使程序的这次执行变为并发

线程的状态

Java中线程工分为以下几个状态:

  1. new 表示线程创建,但是还未执行
  2. runnable 表示线程可以被cpu调度到
  3. blocked 表示线程进入同步代码块或IO操作
  4. running 表示线程正在执行
  5. waiting 表示线程正在等待一个monitor
  6. timed_waiting表示线程有显时间内等待

各个状态之间切换关系如下:

线程状态切换图

Java内存模型

Java内存模型是Java多线程的基础,可以参考文章:全面理解Java内存模型

Therad类介绍

Therad类的使用

继承Thread类并重写Run方法

public static void main(String[] args) {
    UselessThread thread = new UselessThread();
    thread.start();
}

static class UselessThread extends Thread {

    @Override
    public void run() {
        System.out.print("Useless");
    }
}

输出结果:

Useless

使用Runable接口

Thread thread = new Thread(new Runnable() {

    public void run() {
        System.out.print("Useless");
    }
});
thread.start();

输出结果:

Useless

几个常用接口

  1. sleep

sleep方法是Thread中的一个静态方法,用于用于强制停止当前的线程指向若干毫秒。

  1. yield

yield 方法调用后表示线程可以让出CPU,系统可以将CPU调度其他线程执行,也可以继续执行当前线程。

  1. join
    join方法表示阻塞调用该方法的线程,指导被引用的线程结束为止,如下面的例子:
Thread aThread = new Thread(new Runnable() {

    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.print("aThread finished \r\n");
    }
});
aThread.start();

System.out.print("main Threand is abount to join \r\n");
try {
    aThread.join();
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
System.out.print("main Threand finish join \r\n");

输出结果:

main Threand is abount to join

aThread finished
main Threand finish join

  1. interupt
    interupt方法使被引用的线程收到一个InteruptException,以防止被引用的线程无限等待么,如下面的例子:
Thread aThread = new Thread(new Runnable() {

    public void run() {
        try {
            Thread.sleep(5000000);
        } catch (InterruptedException e) {
            System.out.print("aThread is interrupt \r\n");
        }
        System.out.print("aThread finished \r\n");
    }
});
aThread.start();

try {
    Thread.sleep(2000);
    System.out.print("main thread sleep ended \r\n");
} catch (InterruptedException e) {
    e.printStackTrace();
}
aThread.interrupt();

输出结果:

main thread sleep ended
aThread is interrupt
aThread finished

  1. setPriority

setPriority方法用于设置线程的优先级,Thread类中优先级是一个从1到10的整形,数字越大,优先级越高。

基础同步工具

synchronized

synchronized关键字可以用在方法代码块中。在Java中每个对象都有一个唯一的锁,要进synchronized代码块&方法中后必须获得相关的锁,如果无法获得则线程阻塞直到持有锁代码块&方法退出为止。如下面的例子:

final Object lock = new Object();
Thread aThread = new Thread(new Runnable() {

    public void run() {
        System.out.print("aThread is runing \r\n");
        synchronized (lock) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("aThread finished \r\n");
        }
    }
});
aThread.start();
try {
    Thread.sleep(100);
} catch (InterruptedException e) {
    e.printStackTrace();
}
synchronized (lock) {
    System.out.print("main Thread finished \r\n");
}

输出结果如下:

aThread is runing
aThread finished
main Thread finished

从输出结果可以看出主线程被阻塞,直到子线程带锁的代码块返回。

wait & notify & notifyAll

wait接口调用后,调用后会放弃对象的锁,并且当前线程进入了阻塞状态,并进入到一个和该对象相关的等待池中。notfyfy & notify_all接口用于唤醒等待池等待池中第一个/所有的线程。wait方法和notify/notifyAll方法都必须包含在synchronized代码块中。下面是一个例子:

public class TestJava {

    public static Object sObject = new Object();

    /**
     * @param args
     */
    public static void main(String[] args) {
        WaitThread threadA = new WaitThread("ThreadA");
        WaitThread threadB = new WaitThread("ThreadB");
        WaitThread threadC = new WaitThread("ThreadC");

        threadA.start();
        threadB.start();
        threadC.start();

        synchronized (sObject) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {

            }
            sObject.notify();
        }

    }

    static class WaitThread extends Thread {
        WaitThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            synchronized (sObject) {
                System.out.print(getName() + " is abount to wait \r\n");
                try {
                    sObject.wait();
                } catch (InterruptedException e) {
                    // TODO: handle exception
                }
                System.out.print("Thread" + getName() + " end wait \r\n");
            }
        }
    }
}

输出结果如下:

ThreadA is abount to wait
ThreadC is abount to wait
ThreadB is abount to wait
ThreadA end wait

如果将sObject.notify()换成sObject.notifyAll() 输出结果如下:

ThreadA is abount to wait
ThreadB is abount to wait
ThreadC is abount to wait
main thread is about to notify
ThreadC end wait
ThreadB end wait
ThreadA end wait

volatile

volatile作用如下

  1. Java提供了volatile关键字来保证可见性。当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。
  2. 在Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
  3. 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
  4. 禁止进行指令重排序
  5. volatile不可保证原子性

使用volatile的场景如下

  1. 多线程下,保证对当前变量的写操作保证可见性
voaltile boolean isStop = true;

//Thread1
while (isStop){
    dosometing();
}

//Thread2 
isStop = true;

终止一个无限循环的线程可以通过标记变量的形式来做,如果不使用volatile关键字线程2的修改可能修改对线程1不可见,会导致线程1继续无限循环。

  1. 多线程下,禁止指令重排序
Context context;
volatile boolean isInitialized;
//Thread1
context = loadContext();
isInitialized = true;

//Thread2
if (isInitialized) {
    context.getRessources();
}

如果不适用volatile 线程1中的两行代码可能被重排序,从而导致线程2概率性出现空指针问题。

高级多线程工具类

ThreadLocal

ThreadLocal的使用方法

ThreadLocal类为对于为同一对象在每个线程产生一个副本,从而避免资源的竞争,简化同步代码。ThreadLocal 共有三个共有方法。

public static void main(String[] args) {
    final ThreadLocal<String> description = new ThreadLocal<String>() {
        @Override
        protected String initialValue() {
        return "Not initialized";
        }
    };
    Runnable r = new Runnable() {
        public void run() {
        String threadName = Thread.currentThread().getName();
        System.out.print("Thread:" + threadName + " before set description:" + description.get() + "\r\n");
        description.set(threadName);
        System.out.print("Thread:" + threadName + " after set description:" + description.get() + "\r\n");
        }
    };
    Thread t1 = new Thread(r, "A");
    Thread t2 = new Thread(r, "B");
    Thread t3 = new Thread(r, "C");
    t1.start();
    t2.start();
    t3.start();
}

输出结果如下:

Thread:A before set description:Not initialized
Thread:C before set description:Not initialized
Thread:A after set description:A
Thread:B before set description:Not initialized
Thread:C after set description:C
Thread:B after set description:B

从输出结果看三个线程,交替执行 description.get()的值都不受其他线程修改的影响。

ThreadLocal实现分析

  1. Thread类中有一个ThreadLocalMap成员,ThreadLocalMap是一个ThreadLocal和Object的Map
//Thread.java

ThreadLocal.ThreadLocalMap threadLocals = null;

//ThreadLocal.java
static class ThreadLocalMap {

    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
}
  1. ThreadLocal 的Set 和Get放到都会从当前Thread的threadLocals中查找。
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

原子类

原子类AtomicInteger 中提供了一系列的方法,可以保证Integer的相关操作时原子性的,下面观察两段代码:

  1. volatile int变量自增
public volatile int inc = 0;

public void increase() {
    inc++;
}

public static void main(String[] args) {

    final TestJava test = new TestJava();
    for (int i = 0; i < 1000; i++) {
        new Thread() {
            public void run() {
                for (int j = 0; j < 1000; j++)
                    test.increase();
            };
        }.start();
    }

    while (Thread.activeCount() > 1)
        Thread.yield();
    System.out.println(test.inc);
}

1000 个线程中,每个线程对int变量做1000次自增,输出结果如下:

995441

如前面所述,volatile变量只能保证修改可见,自增操作可以分解为 读取,增加,写入三个操作,当线程A和B的执行顺序为,A读取,B读取,A增加,B增加,B写入,A写入时,就会出现结果不一致的问题。

如果修改为AtomicInteger,测试结果如下:

public AtomicInteger atomInt = new AtomicInteger(0);

public void increaseAtom() {
    atomInt.getAndIncrement();
}

public static void main(String[] args) {

    final TestJava test = new TestJava();
    for (int i = 0; i < 1000; i++) {
        new Thread() {
            public void run() {
                for (int j = 0; j < 1000; j++)
                    test.increaseAtom();
            };
        }.start();
    }

    while (Thread.activeCount() > 1)
        Thread.yield();
    System.out.println(test.atomInt.get());
}

1000000

从测试结果看,AtomicInteger的getAndIncrement方法保证了自增操作的原子性。

AtomicInteger的实现

AtomicIntegergetAndIncrement方法的实现如下:

//AtomicInteger.java
public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

//Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
   int v;
    do {
       v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

会循环读取 当前值,然后再调用compareAndSwapInt方法设置新的值。这里要提到的一个概念是CAS, CAS是compare-and-swap的缩写,当要写入一个值时,首先从主存中读取当前值,然后和其他的当前值做比较,如果发现不同则认为其他线程修改了这个值,则写入失败。目前很多CPU都支持CAS指令因此为了效率 compareAndSwapInt 和 getIntVolatile都使用了natvie实现。

Lock类

JDK1.5 之后JAVA提供了Lock接口来实现和synchronized一样的功能,使用方法如下.

public static void main(String[] args) {
    final Lock lock = new ReentrantLock();
    Runnable r = new Runnable() {

        public void run() {
            try {
                System.out.print("Thread:" + Thread.currentThread().getName()
                        + " requect lock \r\n");
                lock.lock();
                long before = System.currentTimeMillis();
                long sleepTime = Math.abs(new Random().nextLong() & 2000);
                System.out.print("Thread:" + Thread.currentThread().getName()
                        + " getLock sleep:" + sleepTime + " \r\n");
                Thread.sleep(sleepTime);
                long after = System.currentTimeMillis();
                System.out.print("Thread:" + Thread.currentThread().getName()
                        + " End Time consumed:" + (after - before) + "\r\n");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    };

    Thread threadA = new Thread(r, "A");
    Thread threadB = new Thread(r, "B");
    Thread threadC = new Thread(r, "C");
    threadA.start();
    threadB.start();
    threadC.start();
}

Thread:C request lock

Thread:B request lock

Thread:A request lock

Thread:C getLock sleep:1680

Thread:C End Time consumed:1681

Thread:A getLock sleep:896

Thread:A End Time consumed:896

Thread:B getLock sleep:1232

Thread:B End Time consumed:1233

从Log可以看出,三个线程在锁之间的代码必须等待其他线程释放锁之后才能执行。

如果将锁的声明修改为:

final Lock lock = new ReentrantLock(true);

可以看到如下log:

Thread:A request lock

Thread:C request lock

Thread:B request lock

Thread:A getLock sleep:336

Thread:A End Time consumed:337

Thread:C getLock sleep:1808

Thread:C End Time consumed:1808

Thread:B getLock sleep:320

Thread:B End Time consumed:321

从输出可以看出,获得锁的顺序和申请锁的一致,这种锁称为公平锁ReentrantLock类可以构造函数可以接受一个Boolean类型的标量用于定义锁是否是公平锁。默认工构造函数为非公平锁。

Condition类

Condition类实现了和Object类wait以及notify类似的功能,参考代码如下"

public static void main(String[] args) {
    final Lock lock = new ReentrantLock();

    ConditionRunable ra = new ConditionRunable(lock);
    ConditionRunable rb = new ConditionRunable(lock);
    ConditionRunable rc = new ConditionRunable(lock);
    Thread threadA = new Thread(ra, "A");
    Thread threadB = new Thread(rb, "B");
    Thread threadC = new Thread(rc, "C");
    threadA.start();
    threadB.start();
    threadC.start();

    try {
        Thread.sleep(1000);
        lock.lock();
        rb.mCondition.signal();
        rc.mCondition.signal();
        ra.mCondition.signal();

    } catch (InterruptedException e) {
    } finally {
        lock.unlock();
    }

}

static class ConditionRunable implements Runnable {
    public Condition mCondition;
    private Lock mLock;

    public ConditionRunable(Lock lock) {
        mLock = lock;
        mCondition = lock.newCondition();
    }

    public void run() {
        try {
            mLock.lock();
            System.out.print("Thread:" + Thread.currentThread().getName()
                    + " reeady to wait \r\n");
            mCondition.await();
            System.out.print("Thread:" + Thread.currentThread().getName() + " wait end \r\n");
        } catch (InterruptedException e) {
        } finally {
            mLock.unlock();
        }
    }
}

输出如下:

Thread:A reeady to wait

Thread:B reeady to wait

Thread:C reeady to wait

Thread:B wait end

Thread:C wait end

Thread:A wait end

Condition的await和signal 和 Object的wait和notify类似,必须包含在锁的lock和unlock方法之间。但是一个lock可以创建多个condition,从而在实际用用中,可以根据需求唤醒不同的线程。

ThreadPoolExecutor

构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造函数比较复杂,说明如下:

  1. 当线程池中的线程小于corePoolSize时,当提交新的任务新建一个线程执行
  2. 当线程池中的线程达到corePoolSize,新提交的任务被放入workQueue等待执行
  3. 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交的任务会创建新线程执行
  4. 当提交的任务数超过maximumPoolSize时,执行RejectedExecutionHandler
  5. 当线程数量超过corePoolSize,且空闲时间达到keepAliveTime,则终止超出corePoolSize的空闲线程
  6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

Android AsynTask中也用到了ThreadPoolExecutor,代码如下:

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;

private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
        sPoolWorkQueue, sThreadFactory);

这里corePoolSize的值,如果CPU核心数大于4,则为4,如果CPU核心数小于等于2则为2。maximumPoolSize的大小为CPU核心数乘以2加1。wockQueue的大小为128。

使用方法

可以参考下面的例子:

static BlockingQueue<Runnable> sBlockingQueue = new LinkedBlockingDeque<Runnable>(4);
static ThreadPoolExecutor sExcutor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.SECONDS,
        sBlockingQueue, new RejectedExecutionHandler() {

            public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
                System.out.print("Task was rejected +\r\n");
            }
        });

/**
 * @param args
 */
public static void main(String[] args) {
    Runnable r = new Runnable() {

        public void run() {
            System.out.print("A Task is runing \r\n");
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {

            }
            System.out.print("A Task is end \r\n");
        }
    };

    for (int i = 0; i < 2; i++) {
        sExcutor.execute(r);
    }
}

有兴趣的同学可以将循环的次数分别修改为2,6,9,10看下执行的结果。

ThreadPoolExecutor实现

深入理解Java线程池

参考文档 - 站在巨人的肩膀

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342