前言
池化思想不仅仅是用于缓冲时,减少数据传递次数
也可以复用类似于线程池、连接池一样,用于复用连接,减少创建销毁开销
不仅仅是创建一个结构体这么简单,由于Java线程跟OS线程1对1,还需要切换内核态,开销更是大了
我们先一步步搭建,从大体框架,到慢慢加入其他机制
搭建基本框架
这里成员变量没有写拒绝策略,以及线程工厂
拒绝策略后续再加,目前使用简单的 throw 或者return拒绝即可
package TreadPool_demo;
import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;
public class MyThreadPool {
private int coreSize;
private int maxSize;
private BlockingQueue<Runnable> taskQueue;
private TimeUnit timeUnit;
private long timeout;
public MyThreadPool(int coreSize,int maxSize,TimeUnit timeUnit,long timeout,BlockingQueue<Runnable>taskQueue){ this.coreSize = coreSize; this.taskQueue = taskQueue; this.maxSize = maxSize; this.timeout = timeout; this.timeUnit = timeUnit; }
private ArrayList<Thread> coreThreadList = new ArrayList<>(); private ArrayList<Thread> supportThreadList = new ArrayList<>();
public void execute(Runnable task){
} }
|
编写核心线程逻辑
先梳理一下核心线程的特点
- 核心线程生命周期 随着 线程池创建和销毁
- 线程池提供核心线程数的set方法,核心线程数执行完任务也会去做判断,然后相应移除超出线程(本文章应该不会实现)
- 线程复用——》不销毁——》通过循环+阻塞方法等待任务不让自己结束(使用阻塞队列 take方法完成
class CoreThread extends Thread{ private Runnable firstTask;
public CoreThread(Runnable firstTask){ this.firstTask = firstTask; } @Override public void run(){ firstTask.run(); while(true){ try { Runnable command = taskQueue.take(); command.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
|
编写最大线程逻辑(额外线程)
额外线程的特点就是,当没有任务执行时,等待空闲时间后,就会销毁自己
恰好可以使用 阻塞队列的 poll方法完成
class SupportThread extends Thread{ private Runnable firstTask; private TimeUnit timeUnit; private long timeout;
public SupportThread(Runnable firstTask,TimeUnit timeUnit,long timeout){ this.firstTask = firstTask; this.timeout = timeout; this.timeUnit = timeUnit; } @Override public void run(){ firstTask.run(); while(true){ try { Runnable command = taskQueue.poll(timeout, timeUnit); if(command==null){ break; } command.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } System.out.println(Thread.currentThread().getName() + "is close"); supportThreadList.remove(Thread.currentThread()); } }
|
回写线程池的Execute
线程的逻辑写完了, 只需要按照流程补充完Execute即可
public void execute(Runnable task) { if (coreThreadList.size() < coreSize) { CoreThread coreThread = new CoreThread(task); coreThreadList.add(coreThread); coreThread.start(); return; } if (taskQueue.offer(task)) { return; }
if (supportThreadList.size() + coreThreadList.size() < maxSize) { SupportThread supportThread = new SupportThread(task, timeUnit, timeout); supportThreadList.add(supportThread); supportThread.start(); return; } if (!taskQueue.offer(task)) { System.out.println("线程池已满,拒绝任务"); } }
|
补充逻辑
加入拒绝策略
这里可以使用接口,然后自己写实现类就好,后续再补充
也可以使用 枚举策略模式
接口+实现类
public class ThrowRejectHandle implements RejectHandle { @Override public void reject(Runnable rejectCommand, MyThreadPool threadPool) { throw new RuntimeException("阻塞队列满了!"); } }
|
枚举策略模式
可以使用Spring Bean注入进行优化,也可以使用 abstract 的 match方法;
public enum RejectStrategy {
ABORT( 1, "abort", (rejectCommand) -> { throw new RuntimeException("线程池已满!"); });
public final int code; public final String desc; public final Function<Runnable, Object> function; RejectStrategy(int code, String desc, Function<Runnable, Object> function) { this.desc = desc; this.code = code; this.function = function; } public static RejectStrategy math(int code) { for (RejectStrategy strategy : RejectStrategy.values()) { if (strategy.code == code) { return strategy; } } return null; }
public Object apply(Runnable rejectCommand) throws Exception{ return function.apply(rejectCommand); } }
|