前言

池化思想不仅仅是用于缓冲时,减少数据传递次数

也可以复用类似于线程池、连接池一样,用于复用连接,减少创建销毁开销

不仅仅是创建一个结构体这么简单,由于Java线程跟OS线程1对1,还需要切换内核态,开销更是大了

我们先一步步搭建,从大体框架,到慢慢加入其他机制

搭建基本框架

这里成员变量没有写拒绝策略,以及线程工厂

拒绝策略后续再加,目前使用简单的 throw 或者return拒绝即可

package TreadPool_demo;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @Author: moying
* @CreateTime: 2025-07-24
* @Description: 简单线程池
*/

public class MyThreadPool {

/**
* 核心线程数
*/
private int coreSize;
/**
* 最大线程数
*/
private int maxSize;
/**
* 阻塞队列,用于存储任务(take方法阻塞,poll方法定时阻塞)
*/
private BlockingQueue<Runnable> taskQueue;

/**
* 时间单位
*/
private TimeUnit timeUnit;
/**
* 空闲时间,控制额外线程的销毁
*/
private long timeout;


public MyThreadPool(int coreSize,int maxSize,TimeUnit timeUnit,long timeout,BlockingQueue<Runnable>taskQueue){
this.coreSize = coreSize;
this.taskQueue = taskQueue;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
}

private ArrayList<Thread> coreThreadList = new ArrayList<>();
private ArrayList<Thread> supportThreadList = new ArrayList<>();

public void execute(Runnable task){
// 1. 如果没有超过核心线程数,则创建线程

// 2. 如果没有超过阻塞队列大小,加入队列

// 3. 如果没有超过最大线程数,创建线程

// 4. 执行拒绝策略
}
}

编写核心线程逻辑

先梳理一下核心线程的特点

  • 核心线程生命周期 随着 线程池创建和销毁
  • 线程池提供核心线程数的set方法,核心线程数执行完任务也会去做判断,然后相应移除超出线程(本文章应该不会实现)
  • 线程复用——》不销毁——》通过循环+阻塞方法等待任务不让自己结束(使用阻塞队列 take方法完成
class CoreThread extends Thread{
private Runnable firstTask;

public CoreThread(Runnable firstTask){
this.firstTask = firstTask;
}
@Override
public void run(){
// 先执行自己拿到的第一个任务
firstTask.run();
while(true){
// 循环拿取阻塞队列任务
try {
// take 会阻塞等待获取数据
Runnable command = taskQueue.take();
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

编写最大线程逻辑(额外线程)

额外线程的特点就是,当没有任务执行时,等待空闲时间后,就会销毁自己

恰好可以使用 阻塞队列的 poll方法完成

class SupportThread extends Thread{
private Runnable firstTask;
private TimeUnit timeUnit;
private long timeout;

public SupportThread(Runnable firstTask,TimeUnit timeUnit,long timeout){
this.firstTask = firstTask;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public void run(){
// 先执行自己拿到的第一个任务
firstTask.run();
while(true){
// 循环拿取阻塞队列任务
try {
// poll 定时等待获取数据
Runnable command = taskQueue.poll(timeout, timeUnit);
if(command==null){
break;
}
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 关闭时给点信息方便后续测试
System.out.println(Thread.currentThread().getName() + "is close");
supportThreadList.remove(Thread.currentThread());
}
}

回写线程池的Execute

线程的逻辑写完了, 只需要按照流程补充完Execute即可

public void execute(Runnable task) {
// 1. 如果没有超过核心线程数,则创建线程
if (coreThreadList.size() < coreSize) {
CoreThread coreThread = new CoreThread(task);
coreThreadList.add(coreThread);
coreThread.start();
return;
}
// 2. 如果没有超过阻塞队列大小,加入队列
if (taskQueue.offer(task)) {
return;
}

// 3. 如果没有超过最大线程数,创建线程
if (supportThreadList.size() + coreThreadList.size() < maxSize) {
SupportThread supportThread = new SupportThread(task, timeUnit, timeout);
supportThreadList.add(supportThread);
supportThread.start();
return;
}
// 4. 执行拒绝策略
if (!taskQueue.offer(task)) {
System.out.println("线程池已满,拒绝任务");
}
}

补充逻辑

加入拒绝策略

这里可以使用接口,然后自己写实现类就好,后续再补充

也可以使用 枚举策略模式

接口+实现类

public class ThrowRejectHandle implements RejectHandle {
@Override
public void reject(Runnable rejectCommand, MyThreadPool threadPool) {
throw new RuntimeException("阻塞队列满了!");
}
}

枚举策略模式

可以使用Spring Bean注入进行优化,也可以使用 abstract 的 match方法;

public enum RejectStrategy {
/**
* 拒绝策略:直接抛出异常
*/
ABORT( 1, "abort", (rejectCommand) -> {
throw new RuntimeException("线程池已满!");
});

public final int code;
public final String desc;
public final Function<Runnable, Object> function;
RejectStrategy(int code, String desc, Function<Runnable, Object> function) {
this.desc = desc;
this.code = code;
this.function = function;
}

public static RejectStrategy math(int code) {
for (RejectStrategy strategy : RejectStrategy.values()) {
if (strategy.code == code) {
return strategy;
}
}
return null;
}

public Object apply(Runnable rejectCommand) throws Exception{
return function.apply(rejectCommand);
}
}