多线程(一)

[TOC]

1. Java并发编程基础

1.1 什么是线程?

现代操作系统调度的最小单元;

一个进程可以创建多个线程,每个线程拥有自己的计数器、堆栈、局部变量等属性,同时可以访问共享的内存变量;

CPU在线程之前高速切换,使之有同时执行的感觉。

1.2 为什么使用多线程?

  • 更多的处理器核心;
  • 更快的响应时间
  • 更好地编程模型

1.3 线程的优先级

1-10个级别,默认是5;

注意:程序的正确性不能依赖线程的优先级高低。

1.4 线程的状态

  • NEW;
  • RUNNABLE;
  • BLOCKED;
  • WAITING;
  • TIME_WAITING;
  • TERMINATED;

1.5 Daemon线程

一种支持型线程。用于程序中后台调度及支持性工作。

注意:当一个Java虚拟机中不存在非Daemon线程时,Java虚拟机将会退出。

可以通过Thread.setDaemon(true)将线程设置为Daemon线程

注意:在构建Daemon线程时,不能依靠finall块中的内容来确保执行关闭或清理资源的逻辑

1.6 启动和终止线程

  • 构造线程:需要提供:线程组、优先级、是否是Daemon线程等信息
  • 启动线程:start();其含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,立即启动该线程
  • 理解中断:线程的一个标志位属性,它表示一个运行中的线程是否被其他线程进行了中断。(见Interrupted.java类)
  • 如何安全的终止线程: 见Shutdown.java
/**
 * Shutdown.java
 *
 * 创建了一个线程CountThread,它不断地进行变量累加,而主线程尝试对其进行中断操作和停止操作。
 */
public class Shutdown {

    public static void main(String[] args) throws InterruptedException {
        Runner one= new Runner();
        Thread countThread = new Thread(one,"countThread");
        countThread.start();
        //睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();
        Runner two= new Runner();
        countThread = new Thread(two,"countThread");
        countThread.start();
        //睡眠1秒,main线程对two进行取消,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        two.cancel();
    }

    private static class Runner implements Runnable{
        private long i;
        private volatile boolean on = true;

        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()){
                i ++ ;
            }
            System.out.println("Count i="+ i);
        }
        public void cancel(){
            on = false;
        }
    }
}

1.7 线程间通信

volatile关键字:告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,以保证可见性。

注意:过多的使用它会降低程序的效率。

synchronized关键字:确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,保证了线程对变量访问的可见性和排他性。

对象、对象的监视器、同步队列、执行线程之间的关系:

线程想要对Object(Object由Synchronized保护)进行访问

-> 首先需要获得Object的监视器

-> 获取监视器成功,则可访问

-> 获取监视器失败,则该线程进入同步队列,状态变为阻塞,直到拥有锁的线程释放了锁,会唤醒同步队列中的线程,再次尝试进行对监视器的获取操作

1.8 等待/通知机制

  • notify()
  • notifyAll()
  • wait()
  • wait(long)
  • wait(long,int)

见WaitNotify.java

/**
 * 两个线程wait线程由notify线程唤醒
 */
public class WaitNotify {

    static boolean flag = true;
    static Object lock = new Object();


    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(),"waitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(),"notifyThread");
        notifyThread.start();
    }


        static class Wait implements  Runnable{
        @Override
        public void run() {
            synchronized(lock){
                while(flag){
                    try{
                        System.out.println(Thread.currentThread() +"flag is true. wait@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread() +"flag is false. running@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));

            }
        }
    }

    static class Notify implements Runnable{

        @Override
        public void run() {
            synchronized(lock){
                System.out.println(Thread.currentThread() +"hold lock. notify@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
            synchronized(lock){
                System.out.println(Thread.currentThread() +"hold lock again. sleep@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }

}

输出:
Connected to the target VM, address: '127.0.0.1:57763', transport: 'socket'
Thread[waitThread,5,main]flag is true. wait@ 09:24:43
Thread[notifyThread,5,main]hold lock. notify@ 09:24:44
Thread[notifyThread,5,main]hold lock again. sleep@ 09:24:49
Disconnected from the target VM, address: '127.0.0.1:57763', transport: 'socket'
Thread[waitThread,5,main]flag is false. running@ 09:24:54

注意点:

  • 先对调用对象枷锁,再调用notify()、notifyAll()、wait()
  • wait()方法使线程状态由Running变为Waiting,同时线程被放置到等待队列
  • notify()、notifyAll()被调用后,等待线程不会立即从wait()返回,需要有锁的那个线程先释放锁以后,才有机会从wait()返回
  • notify()、notifyAll()的操作是将等待队列中的线程放置到同步队列中,同时被移动的线程状态由Waiting转变为Blocked(不同的是,前者只放一个,后者放置所有线程)。
  • wait()能够返回,前提是获得了锁。

1.9 经典范式:生产者/消费者模式

等待方遵循如下原则:

  • 获取对象锁
  • 如果条件不满足,进行wait()操作,被通知后仍要检查条件。
  • 条件满足则执行对应的条件
    通知方遵循如下原则:
  • 获取对象锁
  • 改变条件
  • 通知所有等待在对象上的线程

1.10 管道输入/输出流

主要用于线程之间的数据传输,而传输的媒介为内存

4种具体实现:

  • PipedOutputStream
  • PipedInputStream
  • PipedReader
  • PipedWriter

见Piped.java

/**
 * PipedWriter和PipedReader相连接,主线程读入控制台输入的字符,传给Print线程,打印到控制台
 */
public class Piped {

    public static void main(String[] args)   {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        try {
            out.connect(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
        Thread printThread = new Thread(new Print(in),"PrintThread");
        printThread.start();
        int receive = 0;
        try {
            while((receive = System.in.read())!= 1){
                out.write(receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
    static class Print implements  Runnable{
        private PipedReader in;
        public Print(PipedReader in){
            this.in= in;
        }
        @Override
        public void run() {
            int receive = 0;
            try {
                while((receive = in.read()) != -1 ){
                    System.out.println((char) receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

1.11 Thread.join()的使用

当线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。

见Join.java

/**
 * Join.java
 * 每个线程调用前一个线程的join()方法,意味着:从主线程结束->线程1结束-> ... -> 线程10结束
 */
public class Join {
    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for(int i = 0; i < 10; i ++){
            Thread thread = new Thread(new Domino(previous),String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }

    static class Domino implements Runnable{
        private Thread thread;
        public Domino(Thread thread){
            this.thread = thread;
        }
        @Override
        public void run() {
            try {
                this.thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }

}

输出:

main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.

join()方法的逻辑结构和等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤

1.12 ThreadLocal的使用

ThreadLocal,即线程变量。键值存储结构。

一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。
见Profiler.java

/**
 * Profiler.java
 *
 *相当于每个线程自己会有自己的本地变量,虽然共享了TIME_THREADLOCAL变量,但在get的时候只会获取自己线程的本地变量值
 *通过匿名内部类来构建一个ThreadLocal子类,重写方法initialValue,以便在get和set方法第一次调用时,进行初始化
 */
public class Profiler {
    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
        protected  Long initialValue(){
            return System.currentTimeMillis();
        }
    };
    public static final void begin(){
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }

    public static final long end(){
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }

    public static void main(String[] args) throws InterruptedException {
        Profiler.begin();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Cost:" + Profiler.end() + " mills");
    }


}

1.13 线程实例

等待超时模式:

  • 使用场景:调用一个方法时等待一段时间,能在时间内返回,则立即返回,超时,返回默认结果。
  • 基本点:等待持续时间 REMAINING = T 、超时时间 FUTURE = now + T
    见TimeoutPattern.java
/**
 * 一个经典的等待超时模式
 */
public class TimeoutPattern {

    public synchronized Object get(long mills) throws InterruptedException {
        Object result = null;
        long future = System.currentTimeMillis() + mills;
        long remaining = mills;
        while((result == null)&& remaining > 0 ){
            wait(remaining);
            remaining = future - System.currentTimeMillis();
        }
        return result;
    }
}

一个简单的数据库连接池实例

重点是:使用等待超时模式,在获取连接的过程,如果有连接则直接返回;如果没有,则wait(mills),在其他线程释放连接时被唤醒,如果超时未被唤醒,返回null。

public class ConnectionPool {

    private LinkedList<Connection> pool = new LinkedList<Connection>();

    public ConnectionPool(int initialSize){
        if(initialSize > 0 ){
            for(int i = 0; i < initialSize ; i ++){
                pool.add(ConnectionDriver.createConnection()); //使用动态代理创建一个连接
            }
        }
    }
    public void releaseConnection(Connection connection){
        if(connection != null){
            synchronized (pool){
                pool.add(connection);
                pool.notifyAll();
            }

        }
    }
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool){
            //完全超时
            if(mills < 0 ){
                while (pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while(pool.isEmpty() && remaining > 0 ){
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if(!pool.isEmpty()){
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}
public class ConnectionDriver {
    static class ConnectionHandler implements InvocationHandler{
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if(method.getName().equals("commit")){
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }
    public static  final Connection createConnection(){
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),new Class<?>[]{Connection.class},new ConnectionHandler());

    }
}
public class ConnectionPoolTest {

    static ConnectionPool pool = new ConnectionPool(10);
    static CountDownLatch start = new CountDownLatch(1);
    static CountDownLatch end;

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 1000;
        end = new CountDownLatch(threadCount);
        int count = 1000;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for(int i = 0; i < threadCount; i ++){
            Thread thread = new Thread(new ConnectionRunner(count,got,notGot),"ConnectionRunnerThread");
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection:  " + got);
        System.out.println("not got connectio " + notGot);
    }
    static class ConnectionRunner implements  Runnable{
        int count;
        AtomicInteger got;
        AtomicInteger notgot;

        public ConnectionRunner(int count,AtomicInteger got,AtomicInteger notgot){
            this.count = count;
            this.got = got;
            this.notgot = notgot;
        }

        @Override
        public void run() {
            try {
                start.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while (count > 0 ){
                try {
                    Connection connection = pool.fetchConnection(1000);
                    if(connection!=null){
                        try {
                            connection.createStatement();
                            connection.commit();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    }else{
                        notgot.incrementAndGet();
                    }
                } catch (InterruptedException e) {

                }finally {
                    count --;
                }
            }
            end.countDown();
        }
    }
}

线程池技术

本质:一个线程安全的任务队列,它连接了工作者线程和客户端线程。工作者线程中,不断地在任务队列中获取任务,没有任务就wait,直到在任务队列中新增一个任务后notify唤醒。

public interface ThreadPool<Job extends Runnable> {

    void execute(Job job);

    void shutdown();

    void addWorkers(int num);

    void removeWorker(int num);

    int getJobSize();
}
public class DefaultThreadPool<Job extends  Runnable> implements ThreadPool<Job> {

    private static final int MAX_WORKER_NUMBERS = 10;

    private static final int DEFAULT_WORKER_NUMBERS = 5;

    private static final int MIN_WORKER_NUMBERS = 1;
    //工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    //工作者线程数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool(){
        initializeWorker(DEFAULT_WORKER_NUMBERS);
    }
    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num <  MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorker(workerNum);
    }
    //初始化线程工作者
    private void initializeWorker(int num){
        for(int i = 0 ; i < num; i ++ ){
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-"+ threadNum.incrementAndGet());
            thread.start();
        }
    }


    @Override
    public void execute(Job job) {
        if(job != null){
            synchronized (jobs){
                jobs.addLast(job);
                jobs.notifyAll();
            }
        }
    }

    @Override
    public void shutdown() {
        for(Worker worker : workers){
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs){
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS;
            }
            initializeWorker(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs){
            if(num >= this.workerNum){
                throw  new IllegalArgumentException("beyond worknum");
            }
            //按照给定的数量停止Worker
            int count = 0;
            while(count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count ++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }



    class Worker implements  Runnable{
        //是否工作
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running){
                Job job = null;
                synchronized (jobs){
                    //如果工作者列表是空的,那么久wait
                    while(jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对WorkerThread的中断操作,返回
                            e.printStackTrace();
                            return ;
                        }

                    }
                    job = jobs.removeFirst();
                }
                if(job != null){
                    try{
                        job.run();
                    }catch (Exception e){
                        //此处暂时忽略Job执行中的Exception
                    }
                }
            }
        }
        public void shutdown(){
            running =false;
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • 一、基本概念:程序 - 进程 - 线程 程序(program):是为完成特定任务、用某种语言编写的一组指令的集合。...
    c5fc16271aee阅读 452评论 0 2
  • 线程概述 线程与进程 进程  每个运行中的任务(通常是程序)就是一个进程。当一个程序进入内存运行时,即变成了一个进...
    闽越布衣阅读 990评论 1 7
  • 本文出自 Eddy Wiki ,转载请注明出处:http://eddy.wiki/interview-java.h...
    eddy_wiki阅读 2,007评论 0 14
  • 前几天和大学的室友去了一趟武功山 时间极端两天 欣赏到了美景亦净化了自己的心灵 留几张图作为纪念
    蓝莲滴露阅读 218评论 0 2
  • 好长时间没有写东西,今天上午终于能静下心来写写最近1个多月的生活 学习 工作。最近的半个月基本就是春困状态,完全没...
    peimin阅读 213评论 0 0