ThreadPoolExecutor优先级队列PriorityBlockingQueue

前两天重构代码,调试的时候,发现有个使用到线程池的地方抛出java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to异常
这个代码是线上在跑的一个逻辑,不该出现问题才对,最后还是翻了下源码确定原因

原因:之前向线程池提交任务用的是execute方法,复制的时候错用成了submit方法,改回execute方法即可

既然遇到了,顺便记录下

自定义提交到线程池的任务
@Data
@AllArgsConstructor
class TestRunnable implements Runnable {

    private Integer i;

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(i);
    }
}
  • 线程池配合优先级队列的使用execute
    public static void main(String[] args) {
        // 创建优先级队列,指定队列初始大小 指定队列中的任务比较器
        // 优先级队列是无界的 指定的只是初始大小
        // 可以使用lambda简化
        PriorityBlockingQueue queue = new PriorityBlockingQueue(100, new Comparator<TestRunnable>() {

            @Override
            public int compare(TestRunnable o1, TestRunnable o2) {
                return o1.getI() - o2.getI();
            }
        });
        // 创建线程池 传入优先级队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, queue);

        threadPoolExecutor.execute(new TestRunnable(5));

        threadPoolExecutor.execute(new TestRunnable(2));

        threadPoolExecutor.execute(new TestRunnable(1));

        threadPoolExecutor.execute(new TestRunnable(3));
    }

执行结果

5
1
2
3

根据执行结果可以看到任务已经被排过序

源码笔记
ThreadPoolExecutor execute
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程池有正在执行的任务,其余任务会在指定的队列上排队
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里指定的是PriorityBlockingQueue,可以看下该队列增加任务的方法

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // cmp是创建队列时指定的比较器
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

siftUpUsingComparator方法

    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            // 可以看到调用了比较器的compare方法,以确定任务的优先级
            // x e是指定的任务
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }
  • 线程池配合优先级队列的使用submit
    public static void main(String[] args) {
        // 创建优先级队列,指定队列初始大小 指定队列中的任务比较器
        // 优先级队列是无界的 指定的只是初始大小
        // 可以使用lambda简化
        PriorityBlockingQueue queue = new PriorityBlockingQueue(4, new Comparator<TestRunnable>() {

            @Override
            public int compare(TestRunnable o1, TestRunnable o2) {
                return o1.getI() - o2.getI();
            }
        });
        // 创建线程池 传入优先级队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, queue);

        threadPoolExecutor.submit(new TestRunnable(5));

        threadPoolExecutor.submit(new TestRunnable(2));

        threadPoolExecutor.submit(new TestRunnable(1));

        threadPoolExecutor.submit(new TestRunnable(3));
    }

执行结果

Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to com.tianwen.jdk.TestRunnable
    at com.tianwen.jdk.DemoApplication$1.compare(DemoApplication.java:17)
    at java.util.concurrent.PriorityBlockingQueue.siftUpUsingComparator(PriorityBlockingQueue.java:375)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:492)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.tianwen.jdk.DemoApplication.main(DemoApplication.java:31)
5
2

可以看到,除了一个正在被执行的任务和一个排在队列头部的任务,其余的任务添加时,在调用compare方法都会抛出异常

源码笔记
ThreadPoolExecutor submit
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 封装成一个Future
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 同样的在执行ThreadPoolExecutor的execute方法
        // 但是传入的已经不是最初的任务,而是一个封装过的Future
        execute(ftask);
        return ftask;
    }

实际是在父类AbstractExecutorService中,可以看到submit方法内先将提交给线程池的任务封装成一个Future,再同样执行ThreadPoolExecutorexecute方法。但其实这里的任务已经不是最初指定的任务,而是一个Future,所以在最后尝试将任务放进优先级队列时,调用比较器的compare方法时自然会抛出
java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342