发布时间:2023-02-21 文章分类:编程知识 投稿人:李佳 字号: 默认 | | 超大 打印

自定义线程池

package com.appletree24;
import java.util.ArrayDeque;  
import java.util.Deque;  
import java.util.HashSet;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
class Main {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> {  
            //带超时等待  
//            queue.offer(task,500,TimeUnit.MILLISECONDS);  
        });  
        for (int i = 0; i < 10; i++) {  
            int j = i;  
            threadPool.execute(() -> {  
                try {  
                    Thread.sleep(1000L);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(j);  
            });  
        }  
    }  
}  
//策略模式接口 此处使用策略模式是因为在实现拒绝策略时,有许多种拒绝的方式,这些方式如果不使用恰当的模式,就需要大量的if..else来编写  
//且方式数量大于4个,会造成类膨胀的问题,推荐使用混合模式  
//https://www.runoob.com/design-pattern/strategy-pattern.html  
@FunctionalInterface  
interface RejectPolicy<T> {  
    void reject(BlockingQueue<T> queue, T task);  
}  
class ThreadPool {  
    //任务队列  
    private BlockingQueue<Runnable> taskQueue;  
    //线程集合  
    private HashSet<Worker> workers = new HashSet<>();  
    //线程数  
    private int coreSize;  
    //超时时间  
    private long timeout;  
    private TimeUnit timeUnit;  
    private RejectPolicy<Runnable> rejectPolicy;  
    //执行任务  
    public void execute(Runnable task) {  
        //当任务数未超过核心线程数时,直接交给Worker对象执行  
        //如果超过,则加入阻塞任务队列,暂存起来  
        synchronized (workers) {  
            if (workers.size() < coreSize) {  
                Worker worker = new Worker(task);  
                workers.add(worker);  
                worker.start();  
            } else {  
                //第一种选择死等  
//                taskQueue.put(task);  
                //第二种为超时等待  
                //第三种为消费者放弃任务执行  
                //第四种为主线程抛出异常  
                //第五种让调用者自行执行任务  
                taskQueue.tryPut(rejectPolicy, task);  
            }  
        }  
    }  
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {  
        this.coreSize = coreSize;  
        this.timeout = timeout;  
        this.timeUnit = timeUnit;  
        this.taskQueue = new BlockingQueue<>(queueCapcity);  
        this.rejectPolicy = rejectPolicy;  
    }  
    class Worker extends Thread {  
        private Runnable task;  
        public Worker(Runnable task) {  
            this.task = task;  
        }  
        @Override  
        public void run() {  
            //执行任务  
            //1.当传递过来的task不为空,执行任务  
            //2.当task执行完毕,再接着取下一个任务并执行  
            while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {  
                try {  
                    task.run();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                } finally {  
                    task = null;  
                }  
            }  
            synchronized (workers) {  
                workers.remove(this);  
            }  
        }  
    }  
}  
class BlockingQueue<T> {  
    //1. 任务队列  
    private final Deque<T> queue = new ArrayDeque<>();  
    //2. 锁  
    private final ReentrantLock lock = new ReentrantLock();  
    //3. 生产者条件变量  
    private final Condition fullWaitSet = lock.newCondition();  
    //4. 消费者条件变量  
    private final Condition emptyWaitSet = lock.newCondition();  
    //5. 容量上限  
    private int capcity;  
    public BlockingQueue(int capcity) {  
        this.capcity = capcity;  
    }  
    //带超时的等待获取  
    public T poll(long timeout, TimeUnit unit) {  
        lock.lock();  
        long nanos = unit.toNanos(timeout);  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    if (nanos <= 0) {  
                        return null;  
                    }  
                    nanos = emptyWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
    //消费者拿取任务的方法  
    public T take() {  
        lock.lock();  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    emptyWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
    //阻塞添加  
    public void put(T task) {  
        lock.lock();  
        try {  
            while (queue.size() == capcity) {  
                try {  
                    fullWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完后唤醒消费者等待  
            emptyWaitSet.signal();  
        } finally {  
            lock.unlock();  
        }  
    }  
    //带超时时间的阻塞添加  
    public boolean offer(T task, long timeout, TimeUnit unit) {  
        lock.lock();  
        try {  
            long nanos = unit.toNanos(timeout);  
            while (queue.size() == capcity) {  
                try {  
                    if (nanos <= 0) return false;  
                    nanos = fullWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完后唤醒消费者等待  
            emptyWaitSet.signal();  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
    //获取当前阻塞队列大小  
    public int size() {  
        lock.lock();  
        try {  
            return queue.size();  
        } finally {  
            lock.unlock();  
        }  
    }  
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {  
        lock.lock();  
        try {  
            //判断队列是否已满  
            if (queue.size() == capcity) {  
                rejectPolicy.reject(this, task);  
            } else {  
                queue.addLast(task);  
                emptyWaitSet.signal();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}

上述的自定义线程池虽然能够执行完毕主线程给予的任务,但任务全部执行结束后,开辟的线程池内核心线程仍然在运行,并没有结束,这是因为目前线程池中的take方法仍然为不会有超时等待的take方法,造成了死等,需要为其加入超时停止的功能。也就是替代take()的poll()

JDK自带线程池

介绍

JUC学习-线程池部分
ThreadPoolExecutor使用int的高三位表示线程池状态,低29位表示线程数量
JUC学习-线程池部分
JUC学习-线程池部分
在ThreadPoolExecutor中,同样也存在拒绝策略。其图结构如下:
JUC学习-线程池部分
其中接口就对应着在自定义线程池中实现的策略模式接口,下面的四个实现类就对应着四种不同的拒绝方式:
JUC学习-线程池部分

利用工具类创建固定大小线程池

JUC学习-线程池部分

利用工具类创建带缓冲的线程池

JUC学习-线程池部分
从源码可以看出,带缓冲的线程池中缓冲队列的使用的是一个名为SynchronousQueue的队列,这个队列的特点如下:队列不具有容量,当没有线程来取时,是无法对其内部放入数据的,例如队列内部已有一个数字1,但此时没有线程取走,则线此队列目前并不能继续存入数据,直到1被取走

利用工具类创建单线程线程池

JUC学习-线程池部分
从源码可以看出,单线程线程池中核心线程数与最大线程数相等,即不存在应急线程。只能解决一个任务
那么这个线程池和我自己创建一个线程的线程池有什么区别呢?区别如下:
JUC学习-线程池部分

ThreadPoolExecutor-submit method

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    Future<String> result = pool.submit(() -> {  
        System.out.println("running");  
        Thread.sleep(1000);  
        return "ok";  
    });  
    System.out.println(result.get());  
}

submit方法可以传入Runnable和Callable类型的参数,并且将线程内部所执行任务的结果返回,用Future包装

ThreadPoolExecutor-invokeAll

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    results.forEach(f -> {  
        try {  
            System.out.printf(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

invokeAll方法可以传入任务的集合,同样的任务的返回值也会以列表形式返回

ThreadPoolExecutor-invokeAny

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    String result = pool.invokeAny(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.awaitTermination(1000, TimeUnit.MILLISECONDS);  
    System.out.println(result);  
}

invokeAny方法同样可以传入任务的集合,只不过最后返回的结果并不是任务的结果集合,而是最早完成的那个任务的结果。

ThreadPoolExecutor-shutdown

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.shutdown();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdown方法会将线程池的状态变为SHUTDOWN

ThreadPoolExecutor-shutdownNow

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    List<Runnable> runnables = pool.shutdownNow();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdownNow方法会将线程池状态变为STOP