线程池参数原理及应用

/ java / 没有评论 / 100浏览

线程池原理 Java创建一个线程很方便,只需new Thread()就可以, 但是当有多个任务需要进行进行处理时,频繁的进行创建和启用线程同样需要系统开销,也不利于管理,于是同mysql的连接池一样,自然有对线程的管理池即线程池。

做个比喻,线程池好比一个公司,那么线程本身就是一个个的员工,来对线程的创建和销毁进行管理,最大化的进行资源的合理调度。

Java的线程池创建也很简单,concurrent这个并发包下有Executors可以很方便的进行四种常用线程的创建:

newFixedThreadPool:创建固定数量的线程的线程池,可以控制最大并发数,常用于知道具体任务的数量,需要进行多线程的操作,如批量插入数据库任务,需要进行10万条数据分页,每1万条数据一页,配置一个线程处理,一共配置10个线程,进行并行批量插入,就可以使用这个线程池来进行,大大减少响应时间
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
newCachedThreadPool: 创建可一段时间内重复利用的线程池,常用于不知道具体的任务数量,但是还需要进行并行处理的情况,如springboot @Aysnc就可以指定使用这个线程池,来进行一些埋点等的各种业务的异步处理
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    newSingleThreadExecutor: 创建单个线程的线程池,这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    newScheduledThreadPool: 创建一个可以定时和重复执行的线程池,常用于定时任务和延时任务的执行线程池

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
当然线程池还可以自定义,Java只是提供了几种常用的静态线程池的创建方法,以上也已经将4种线程池的创建源码显示出来了,可以发现线程池的创建都是通过new ThreadPoolExecutor()来实现的,现在主要介绍下几个重要的参数和接口:

首先ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,AbstractExecutorService又实现了ExecutorService接口,ExecutorService接口继承了只有一个方法execute的Executor。

  下面解释下一下构造器中各个参数的含义:

ArrayBlockingQueue; 有界阻塞队列,由数组实现,需要指定数组大小
LinkedBlockingQueue; 无界阻塞队列,由链表实现,最大值是Integer的最大值 
 SynchronousQueue; 这个队列不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
其中注意这几个参数都是volatile修饰的,用来保证多线程下的可见性,我们也可以根据这些参数的不同配置,来产生我们需要的线程池。

请输入图片描述

有了线程池后,我们需要关注几个线程池的状态:

请输入图片描述

下图表明几个状态之间的转化关系:

请输入图片描述

接下来就是举个栗子来表明如何使用:
    ExecutorService executorService = Executors.newFixedThreadPool(15);
在执行完上述代码后,我们其实就创建了一个有15个核心线程数量,最大也是15个线程数量,空闲线程保存时间为1分钟,采用无限阻塞队列,任务拒绝采用AbortPolicy:丢弃任务并抛出RejectedExecutionException异常的线程池。在创建后,并没有进行活跃的线程工人产生,可用线程数为0,比如接下来有10个任务进来,就会创建10个线程工人来进行工作,并且工作完不会销毁,之后又来了10个任务,之前的10个线程还没有处理完他们自己的任务,这个时候就又会创建5个线程工人来进行任务的处理,有小伙伴有疑问了,那剩下的5个任务怎么办呢,对了,还有阻塞队列,这些没有工人处理的任务会进入待办事项般的阻塞队列,先进先出,待15个工人将手头的活办完之后进行依次处理,因为阻塞队列是无界阻塞队列,因此,任务会不断的丢到这个队列中,所以,并不会创建因为队列太小,而不得已创建几个个临时工来处理,这个几个数量即在最大线程和核心线程之间的差值数量,这些临时线程的有效时间只有keepAliveTime的时间,此外在来了多个任务之后,如果队列是有界的,且任务数超过了最大能够创建的线程数,即工人不能再招了,待办事项列表也满了,这个时候公司旧不干了,抛出异常,任务拒绝策略。

接下了是实战,结合CompletableFuture进行展示:

简单介绍下CompletableFuture:CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法,结合线程池可以达到并发编程的目的    
package cn.chinotan;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.*;

/**
 * @program: test
 * @description: 多线程测试
 * @author: xingcheng
 * @create: 2019-03-23 17:27
 **/
@Slf4j
public class ExecutorTest {

    @Test
    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(15);

        CompletableFuture[] completableFutures = new CompletableFuture[15];
        List<Integer> integers = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 15; i++) {
            int finalI = i;
            CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> costMethod(finalI), executorService)
                    .whenComplete((r, e) -> {
                        if (null != e) {
                           e.printStackTrace(); 
                        } else {
                            integers.add(r);
                        }
                    });

            completableFutures[i] = integerCompletableFuture;
        }

        CompletableFuture.allOf(completableFutures).join();
        long count = integers.stream().count();
        log.info("一共处理成功:{}", count);
    }

    /**
     * 耗时的操作
     *
     * @param i
     * @return
     */
    public int costMethod(int i) {
        try {
            TimeUnit.SECONDS.sleep(5);
            log.info("耗时的操作 {}", i);
            return 1;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return 0;
        }
    }
}
运行结果:

请输入图片描述

可以看到15个耗时的操作很快就并行执行完成,并且还能返回执行的成功结果数

以上就是我对线程池的理解和应用,欢迎大家关注和浏览提问,谢谢大家

本文来源:https://my.oschina.net/u/3266761/blog/3026786