多线程
线程实现
demo
public class TestThread1 extends Thread{
@Override public void run(){ for (int i = 0; i < 20; i++) { System.out.println("子线程" + i); } }
public static void main(String[] args) {
TestThread1 testThread1 = new TestThread1(); testThread1.start();
for (int i = 0; i < 20; i++) { System.out.println("主线程" + i); } } }
|
继承Thread类
实现Runnable类
package Thread_learn.demo1;
public class Race implements Runnable{
private static String winner; @Override public void run() { for (int i = 0; i <= 100; i++) {
boolean flag = gameOver(i); if(flag){ break; } System.out.println(Thread.currentThread().getName() + "跑了" + i + "米"); } } private boolean gameOver(int steps){ if(winner != null){ return true; }else { if(steps >= 100){ winner = Thread.currentThread().getName(); System.out.println("winner is " + winner); return true; } } return false; }
public static void main(String[] args) { Race race = new Race(); new Thread(race, "兔子").start(); new Thread(race, "乌龟").start(); } }
|
callable
public class TestCallable implements Callable<Integer> {
@Override public Integer call() throws Exception { return Integer.MAX_VALUE; }
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3); Future<Integer> result = executorService.submit(new TestCallable());
System.out.println(result.get()); executorService.shutdownNow(); } }
|
线程状态
package Thread_learn.demo3;
public class TestStop implements Runnable{
private boolean flag = true;
@Override public void run() { int i = 0; while (flag){ System.out.println("run...Thread" + i++); } }
public void stop(){ this.flag = false; }
public static void main(String[] args) { TestStop testStop = new TestStop(); new Thread(testStop).start();
for (int i = 0; i < 1000; i++) { System.out.println("main" + i); if(i==900){ testStop.stop(); System.out.println("线程停止了"); } }
} }
|
线程同步
多个线程操作同一个资源
出现死锁的必要条件
线程通信
线程协作
生产者消费者模式
管程法
package Thread_learn.PC_model;
public class TestPC {
public static void main(String[] args) throws InterruptedException { ChickenBuffer chickenBuffer = new ChickenBuffer(); new Producer(chickenBuffer).start(); new Consumer(chickenBuffer).start();
}
}
class Producer extends Thread { ChickenBuffer chickenBuffer;
public Producer(ChickenBuffer chickenBuffer) { this.chickenBuffer = chickenBuffer; }
@Override public void run() { for (int i = 0; i < 100; i++) { chickenBuffer.push(new Chicken(i+1)); System.out.println("生产了第" + (i + 1) + "只鸡"); } } }
class Consumer extends Thread {
ChickenBuffer chickenBuffer;
public Consumer(ChickenBuffer chickenBuffer) { this.chickenBuffer = chickenBuffer; }
@Override public void run() { for (int i = 0; i < 100; i++) { Chicken pop = chickenBuffer.pop(); System.out.println("消费了第" + pop.id + "只鸡"); } }
}
class Chicken { int id; public Chicken(int id) { this.id = id; }
}
class ChickenBuffer {
Chicken[] chickens = new Chicken[10];
int count = 0;
public synchronized void push(Chicken chicken) { while (count == chickens.length) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } chickens[count] = chicken; count++;
this.notifyAll();
}
public synchronized Chicken pop() { while (count == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count--; Chicken chicken = chickens[count]; this.notifyAll();
return chicken; } }
|
信号灯法
package Thread_learn.PC_model;
public class TestPC2 { public static void main(String[] args) {
TV tv = new TV(); new Player(tv).start(); new Audience(tv).start(); } }
class Player extends Thread {
TV tv; public Player(TV tv){ this.tv = tv; } @Override public void run() { for(int i = 0; i < 20; i++){ if(i % 2 == 0){ this.tv.play("奇葩说"); }else{ this.tv.play("太污了,喝瓶立白洗洗嘴"); } } } }
class Audience extends Thread {
TV tv ; public Audience(TV tv){ this.tv = tv; } @Override public void run() { for (int i = 0; i < 20; i++) { tv.watch(); } } }
class TV{ String voice; boolean flag = true;
public synchronized void play (String voice){ if(!flag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("表演了:" + voice); this.notifyAll(); this.voice = voice; this.flag = !this.flag;
} public synchronized void watch(){ if (flag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("观看了:" + voice); this.notifyAll(); this.flag =!this.flag; } }
|
LOCK & Condition
package com.moying.LockDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "PipeLine") public class PipeLine {
public static void main(String[] args) { Member member = new Member();
new Thread(() -> { for (int i = 0; i < 10; i++) { member.PrintA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { member.PrintB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { member.PrintC(); } }, "C").start(); } }
@Slf4j class Member { Lock lock = new ReentrantLock(); int ant = 1;
private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition();
public void PrintA() {
lock.lock(); try { while (ant != 1) { try { conditionA.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("AAAAAAAAAAAAAAAA"); ant = 2; conditionC.signal(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } }
public void PrintB() { lock.lock(); try { while (ant != 3) { try { conditionB.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("BBBBBBBBBBBBBBBBBBB");
ant = 1; conditionA.signal(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } }
public void PrintC() { lock.lock(); try { while (ant != 2) { try { conditionC.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("CCCCCCCCCCCCCCCCCCCCC"); ant = 3; conditionB.signal(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }
|
线程池
ExectorService & Exectors.new****
八锁问题
所谓的“线程八锁”
其实就是看 synchronized 锁住的是哪个对象
情况1:12 或 21都是有可能的,就看cpu先调度哪个线程
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { log.debug("1"); } public synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
|
情况2:1s后12,或 2 1s后 1 ,还是看cpu先调度哪个线程
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
|
情况3:3 1s后 12 、 23 1s后 1 、 32 1s后 1,3肯定是最开始的打印的,就看1或2谁先打印
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } public void c() { log.debug("3"); } }
public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); new Thread(()->{ n1.c(); }).start(); }
|
情况4:2 1s 后 1,没有互斥,同时运行,2先打印,sleep 1秒后打印1
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); Number n2 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n2.b(); }).start(); }
|
情况5:2 1s 后 1,锁住的对象不同,所以和题4一样,不存在互斥。
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
|
情况6:1s 后12, 或 2 1s后 1,还是看cpu先调度哪个线程
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public static synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
|
情况7:2 1s 后 1,锁住的对象不同,所以和题4一样,不存在互斥。
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); Number n2 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n2.b(); }).start(); }
|
情况8:1s 后12, 或 2 1s后 1,锁着的同一个对象,还是看cpu先调度哪个线程
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public static synchronized void b() { log.debug("2"); } }
public static void main(String[] args) { Number n1 = new Number(); Number n2 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n2.b(); }).start(); }
|
高级主题
JUC
Callable
package com.moying.callable_demo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable; import java.util.concurrent.FutureTask;
@Slf4j(topic = "TestCallable") public class TestCallable {
public static void main(String[] args) {
MyCallable myCallable = new MyCallable(); FutureTask<Integer> futureTask = new FutureTask<>(myCallable); new Thread(futureTask,"A").start(); new Thread(futureTask,"B").start(); try { Integer integer = futureTask.get(); log.info("返回值为:"+integer); } catch (Exception e) { e.printStackTrace(); } } }
class MyCallable implements Callable<Integer>{
@Override public Integer call() throws Exception { System.out.println("call()方法执行了");
Thread.sleep(3000); return 1024; } }
|
常用辅助类
CountDownLatch
减法器
CyclicBarrier
加法器
Semaphore
类比停车位(或者对比取令牌)
阻塞队列
线程池
线程池:三大方式、七大参数、四种拒绝策略
池化技术
程序的运行,本质:占用系统的资源!我们需要去优化资源的使用 ===> 池化技术
线程池、JDBC的连接池、内存池、对象池 等等。。。。
资源的创建、销毁十分消耗资源
池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。
1)线程池的好处:
1、降低资源的消耗;
2、提高响应的速度;
3、方便管理;
线程复用、可以控制最大并发数、管理线程;
2)线程池:三大方法
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的
工具类 Executors 三大方法;
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的
//线程池用完必须要关闭线程池
try {
for (int i = 1; i <=100 ; i++) {
//通过线程池创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+ " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
3)七大参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 int maximumPoolSize, //最大的线程池大小 long keepAliveTime, //超时了没有人调用就会释放 TimeUnit unit, //超时单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动 RejectedExecutionHandler handler //拒绝策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
阿里巴巴的Java操作手册中明确说明:对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池。
public class PollDemo { public static void main(String[] args) { int max = Runtime.getRuntime().availableProcessors(); ExecutorService service =new ThreadPoolExecutor( 2, max, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { for (int i = 1; i <= 10; i++) { service.execute(() -> { System.out.println(Thread.currentThread().getName() + "ok"); }); } }catch (Exception e) { e.printStackTrace(); } finally { service.shutdown(); } } }
|
4)拒绝策略
- new ThreadPoolExecutor.AbortPolicy(): //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常
超出最大承载,就会抛出异常:队列容量大小+maxPoolSize
new ThreadPoolExecutor.CallerRunsPolicy(): //该拒绝策略为:哪来的去哪里 main线程进行处理
new ThreadPoolExecutor.DiscardPolicy(): //该拒绝策略为:队列满了,丢掉异常,不会抛出异常。
new ThreadPoolExecutor.DiscardOldestPolicy(): //该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
5)如何设置线程池的大小
1、CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小
int max = Runtime.getRuntime().availableProcessors(); ExecutorService service =new ThreadPoolExecutor( 2, max, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );
|
2、I/O密集型:
在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,大约是最大I/O数的一倍到两倍之间。
ForkJoin
什么是forkJoin
并行执行任务,加快效率
大任务拆分为多个小任务
package com.moying.forkJoin;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream;
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start ; private Long end; private Long temp = 100000000L; public ForkJoinDemo(Long start, Long end){ this.start = start; this.end = end;
} public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
} public static void test1(){ long start = System.currentTimeMillis(); long sum = 0; for (int i = 0; i <=10_0000_0000; i++) { sum += i; } System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("test1耗费时间为:"+(end-start)); } public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task);
System.out.println(submit.get()); long end = System.currentTimeMillis(); System.out.println("test2耗费时间为:"+(end-start)); }
public static void test3(){ long start = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); System.out.println(reduce); long end = System.currentTimeMillis(); System.out.println("test3耗费时间为:"+(end-start)); }
@Override protected Long compute() { if((end-start)>temp){ long middle =(start+end)/2; ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end); task2.fork(); return task1.join()+task2.join(); }else{ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; } } }
|
异步回调
Future
@Override public void batchAddQuestionBankQuestion(Long questionBankId, List<Long> questionIdList, User loginUser) { ThrowUtils.throwIf(CollUtil.isEmpty(questionIdList), ErrorCode.PARAMS_ERROR, "题目列表为空"); ThrowUtils.throwIf(questionBankId == null || questionBankId <= 0, ErrorCode.PARAMS_ERROR, "题库非法"); ThrowUtils.throwIf(loginUser == null, ErrorCode.NOT_LOGIN_ERROR); LambdaQueryWrapper<Question> questionLambdaQueryWrapper = Wrappers.lambdaQuery(Question.class) .select(Question::getId) .in(Question::getId, questionIdList);
List<Long> validQuestionIdList = questionService.listObjs(questionLambdaQueryWrapper, obj -> (Long) obj); ThrowUtils.throwIf(CollUtil.isEmpty(validQuestionIdList), ErrorCode.PARAMS_ERROR, "合法的题目列表为空");
ThrowUtils.throwIf(CollUtil.isEmpty(validQuestionIdList), ErrorCode.PARAMS_ERROR, "合法的题目列表为空"); QuestionBank questionBank = questionBankService.getById(questionBankId); ThrowUtils.throwIf(questionBank == null, ErrorCode.NOT_FOUND_ERROR, "题库不存在"); LambdaQueryWrapper<QuestionBankQuestion> lambdaQueryWrapper = Wrappers.lambdaQuery(QuestionBankQuestion.class) .eq(QuestionBankQuestion::getQuestionBankId, questionBankId) .in(QuestionBankQuestion::getQuestionId, validQuestionIdList); List<QuestionBankQuestion> existQuestionList = this.list(lambdaQueryWrapper); Set<Long> existQuestionIdSet = existQuestionList.stream() .map(QuestionBankQuestion::getId) .collect(Collectors.toSet()); validQuestionIdList = validQuestionIdList.stream().filter(questionId -> { return !existQuestionIdSet.contains(questionId); }).collect(Collectors.toList()); ThrowUtils.throwIf(CollUtil.isEmpty(validQuestionIdList), ErrorCode.PARAMS_ERROR, "所有题目都已存在于题库中");
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor( 20, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), new ThreadPoolExecutor.CallerRunsPolicy() );
List<CompletableFuture<Void>> futures = new ArrayList<>();
int batchSize = 1000; int totalQuestionListSize = validQuestionIdList.size();
for (int i = 0; i <totalQuestionListSize ; i += batchSize) { List<Long> subList = validQuestionIdList.subList(i, Math.min(i + batchSize, totalQuestionListSize)); List<QuestionBankQuestion> questionBankQuestions = subList.stream().map(questionId -> { QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion(); questionBankQuestion.setQuestionBankId(questionBankId); questionBankQuestion.setQuestionId(questionId); questionBankQuestion.setUserId(loginUser.getId()); return questionBankQuestion; }).collect(Collectors.toList());
QuestionBankQuestionService questionBankQuestionService = (QuestionBankQuestionServiceImpl) AopContext.currentProxy();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { questionBankQuestionService.batchAddQuestionsToBankInner(questionBankQuestions); }, customExecutor).exceptionally(ex->{ log.error("添加题目到题库时发生未知错误,错误信息: {}", ex.getMessage());
return null; }); futures.add(future); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
customExecutor.shutdown();
}
|
JMM
Volatile 是java jvm 提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM
java内存模型,不存在的东西,是一个约定
- 线程解锁前 必须把共享变量
- 线程加锁前