线程状态

线程状态

java创建运行线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//方式一
// 构造方法的参数是给线程指定名字,推荐
Thread t1 = new Thread("t1") {
@Override
// run 方法内实现了要执行的任务
public void run() {
//todo^
}
};
t1.start();
//方式二
Runnable runnable = new Runnable() {
public void run(){
// 要执行的任务
}
};
// 创建线程对象
Thread t = new Thread( runnable );
// 启动线程
t.start();

//方式三:FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况
// 创建任务对象
FutureTask<Integer> task3 = new FutureTask<>(() -> {
//todo
return 100;
});
// 参数1 是任务对象; 参数2 是线程名字
new Thread(task3, "t3").start();
// 主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get();

捕获线程异常的方法

  • 创建线程时设置
1
2
3
Thread t = new Thread(new RunnableImp());
t.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
t.start();
  • 直接给Thread设置默认处理
1
Thread.setDefaultUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
  • 使用Callable的Feature.get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//2.创建Callable,有返回值的,你也可以创建一个线程实现Callable接口。
// 如果你不需要返回值,这里也可以创建一个Thread即可,在第3步时submit这个thread。
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1/0;
}
};
//3.提交待执行的线程
Future<Integer> future = executorService.submit(callable);
try {
Integer result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//4.处理捕获的线程异常
}
  • 使用Executors或直接new ThreadPoolExecutor时参数ThreadFactory中指定
1
2
3
4
5
6
7
8
9
10
ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
return thread;
}
});
exec.execute(new ExceptionThread());
//注:以submit方式提交该方式失效,异常被Future.get封装在ExecutionException中重新抛出。
  • 重写ThreadPoolExecutor的afterExecute方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//1.创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (r instanceof Thread) {
if (t != null) {
//处理捕获的异常
}
} else if (r instanceof FutureTask) {
FutureTask futureTask = (FutureTask) r;
try {
futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//处理捕获的异常
}
}

}
};
Thread t1 = new Thread(() -> {
int c = 1 / 0;
});
threadPoolExecutor.execute(t1);
Callable<Integer> callable = () -> 2 / 0;
threadPoolExecutor.submit(callable);

参考链接

sleep

调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞),不会释放锁

其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException 异常。

可使用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性。

yield

让出执行权,不会释放锁

调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程。

具体的实现依赖于操作系统的任务调度器。

join

等待线程执行完

1
2
3
4
5
6
7
8
9
10
Thread t1 = new Thread(() -> {
sleep(2);
r1 = 10;
});
long start = System.currentTimeMillis();
t1.start();
// 线程执行结束会导致 join 提前结束
t1.join(1500);//无参为阻塞等待
long end = System.currentTimeMillis();
log.debug("r1: {} r2: {} cost: {}", r1, r2, end - start);

wait&notify

obj.wait()指令就会释放锁,进入阻塞等待(WAITING)状态。等待其他线程调用锁对象obj.notify()/notifyAll()唤醒或者执行该线程对象的interrupt()方法中断他。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

@Slf4j
public class WaitAndNotify {
private static final Object lock = new Object();
private static boolean hasWater = false;
private static boolean hasFruits = false;

public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(()->{
synchronized(lock) {
while(!hasWater) {
log.info("没水等待");
try {
//让出锁
lock.wait(); //lock.wait(time); 等待time时间,0或空参为无限等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("成功拿到水");
}
},"thread1");
thread1.start();

Thread thread2 = new Thread(()->{
synchronized(lock) {
while(!hasFruits) {
log.info("没水果等待");
try {
//让出锁
lock.wait(); //lock.wait(time); 等待time时间,0或空参为无限等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("成功拿到水果");
}
},"thread2");
thread2.start();

Thread.sleep(2000);
giveWater();
Thread.sleep(2000);
giveFriuits();
}

private static void giveWater() {
synchronized(lock) {
log.info("水来了");
hasWater = true;
lock.notifyAll();//唤醒所有释放lock锁的等待线程
// lock.notify();//随机唤醒一个
}
}
private static void giveFriuits() {
synchronized(lock) {
log.info("水果来了");
hasFruits = true;
lock.notifyAll();
}
}
}

//结果
[thread1] [INFO ] com.czm.concurrent.WaitAndNotify:19 - -- 没水等待
[thread2] [INFO ] com.czm.concurrent.WaitAndNotify:35 - -- 没水果等待
[main] [INFO ] com.czm.concurrent.WaitAndNotify:57 - -- 水来了
[thread2] [INFO ] com.czm.concurrent.WaitAndNotify:35 - -- 没水果等待 //等待水果的也被唤醒,再次等待
[thread1] [INFO ] com.czm.concurrent.WaitAndNotify:27 - -- 成功拿到水
[main] [INFO ] com.czm.concurrent.WaitAndNotify:65 - -- 水果来了
[thread2] [INFO ] com.czm.concurrent.WaitAndNotify:43 - -- 成功拿到水果

park&unpark

暂停当前线程 LockSupport.park(); 不会释放此前获取到锁

恢复某个线程的运行 LockSupport.unpark(暂停线程对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.czm.concurrent;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
/**
* @author CZM
* @create 2020/12/24
*/
@Slf4j
public class ParkAndUnpark {

public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("parking...");
LockSupport.park();
log.info("继续运行parking2");
LockSupport.park();
log.info("结束");

},"thread1");
thread1.start();
log.info("unpark");
LockSupport.unpark(thread1);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("unpark2");
LockSupport.unpark(thread1);
}

}

学习链接

锁粗化与锁消除

MESI协议(缓存一致性协议)

偏向锁、轻量级锁、重量级锁

学习链接

学习连接

volatile(jdk1.5及其以后版本生效)

它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存,保证该变量的可见性(一个线程对变量的修改对另一个线程可见)。

注:线程可能从自己工作内存中的高速缓存中读取一个共享变量,没法及时知道其他线程对该变量的修改。导致对该变量的不可见现象。

volatie保证可见性与有序性

写屏障(sfence)

保证在该屏障之前的,对共享变量的改动,都同步到主存当中(可见性),

会确保指令重排序(JVM的JIT优化实现指令级并行)时,不会将写屏障之前的代码排在写屏障之后(有序性)。

1
2
3
4
5
6
7
8
private static volatile boolean flag = false; //共享变量

public void test(){
//todo修改其他共享变量
flag = true;
// 写屏障,在这之前的所有共享变量被修改的将会被同步到内存中
}

读屏障(lfence)

保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据(可见性),

会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前(有序性)。

1
2
3
4
5
6
7
8
9
10
private static volatile boolean flag = false; //共享变量

public void test2(){
//todo
// 读屏障,在这之后的所有共享变量的读取都会加载内存中的值
if(flag){
num++;
}

}

Java内存模型 JMM(Java Memory Mode)

链接1

链接2

jdk提供的原子类型

链接

原子整数

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

原子引用

  • AtomicReference (无法解决ABA问题)
  • AtomicMarkableReference(带版本号的原子应用,解决ABA问题,可追踪修改)
  • AtomicStampedReference (带修改标记,只辨别是否存在ABA)

原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

字段更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常 (java.lang.IllegalArgumentException: Must be volatile type)。

原子累加器

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

链接

JDK提供的线程池

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。

状态名 高三位 是否执行阻塞队列任务 是否接收新任务 说明
RUNNABLE 111
SHUTDOWN 000 不会接收新任务,
但会处理阻塞队列剩余任务
STOP 001 会中断正在执行的任务,
并抛弃阻塞队列任务
TIDYING 010 任务全执行完毕,
活动线程为 0 即将进入终结状态
TERMINATED 011 终结状态

image-20210125223403356

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize, //核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 急救线程生存时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, //创建线程的工厂
RejectedExecutionHandler handler //拒绝策略
)

Executors类中用工厂方法提供以下常用线程池

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

线程数固定,核心线程数 == 最大线程数(没有救急线程被创建),阻塞队列是无界的,可以放任意数量的任务.

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,全部都是救急线程(空闲60s 后可以回收),队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)适合任务数比较密集,但每个任务执行时间较短的情况。

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

比自己创建一个线程的优势:

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一

个线程,保证池的正常工作。

Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改。(Executors.newFixedThreadPool(1) 初始时为1 ,可以强转ThreadPoolExecutor 对象后调用 setCorePoolSize 等方法进行修改。)

FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因

此不能调用 ThreadPoolExecutor 中特有的方法。

newScheduledThreadPool

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

执行定时/延时任务。

ThreadPoolExecutor原理分析

参考链接

任务提交执行流程

阻塞队列

LinkedBlockingQueue

底层是由内部类Node组成的单向链表,虚拟头节点。内部维护两个ReentrantLock,入队出队使用不同的锁,大大提高吞吐量。初始化时可指定其容量大小,没有默认Integer.MAX_VALUE(当入队速度大于出队速度时,有OOM风险)。

参考链接

LinkedBlockingDeque

底层是一个双向链表,内部只有一个ReentrantLock。容量规则和LinkedBlockingQueue一致,但比它多了更多出队入队方法。

SynchronousQueue

内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。
参考链接

ArrayBlockingQueue

用数组实现的有界阻塞队列,初始化时必须指定容量。

DelayQueue

有优先级的阻塞队列,有个PriorityQueue优先队列的字段。添加元素必须实现Delayed接口。

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。和SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列,与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步。

参考链接

Fork/Join

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

链接1

死锁产生四个条件

  1. 互斥条件:一个资源每次只能被一个进程使用。
  2. 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
  4. 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

其他重要知识链接

理解乐观锁与悲观锁

java并发

jdk线程使用