线程状态

java创建运行线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
Thread t1 = new Thread("t1") { @Override public void run() { } }; t1.start();
Runnable runnable = new Runnable() { public void run(){ } };
Thread t = new Thread( runnable );
t.start();
FutureTask<Integer> task3 = new FutureTask<>(() -> { return 100; });
new Thread(task3, "t3").start();
Integer result = task3.get();
|
捕获线程异常的方法
1 2 3
| Thread t = new Thread(new RunnableImp()); t.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler()); t.start();
|
1
| Thread.setDefaultUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ExecutorService executorService = Executors.newFixedThreadPool(10);
Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { return 1/0; } };
Future<Integer> future = executorService.submit(callable); try { Integer result = future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { }
|
- 使用Executors或直接new ThreadPoolExecutor时参数ThreadFactory中指定
1 2 3 4 5 6 7 8 9 10
| ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler()); return thread; } }); exec.execute(new ExceptionThread());
|
- 重写ThreadPoolExecutor的afterExecute方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()) { @Override protected void afterExecute(Runnable r, Throwable t) { if (r instanceof Thread) { if (t != null) { } } else if (r instanceof FutureTask) { FutureTask futureTask = (FutureTask) r; try { futureTask.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { } }
} }; Thread t1 = new Thread(() -> { int c = 1 / 0; }); threadPoolExecutor.execute(t1); Callable<Integer> callable = () -> 2 / 0; threadPoolExecutor.submit(callable);
|
参考链接
sleep
调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞),不会释放锁 。
其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException 异常。
可使用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性。
yield
让出执行权,不会释放锁。
调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程。
具体的实现依赖于操作系统的任务调度器。
join
等待线程执行完
1 2 3 4 5 6 7 8 9 10
| Thread t1 = new Thread(() -> { sleep(2); r1 = 10; }); long start = System.currentTimeMillis(); t1.start(); t1.join(1500); long end = System.currentTimeMillis(); log.debug("r1: {} r2: {} cost: {}", r1, r2, end - start);
|
wait¬ify
obj.wait()指令就会释放锁,进入阻塞等待(WAITING)状态。等待其他线程调用锁对象obj.notify()/notifyAll()唤醒或者执行该线程对象的interrupt()方法中断他。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @Slf4j public class WaitAndNotify { private static final Object lock = new Object(); private static boolean hasWater = false; private static boolean hasFruits = false;
public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(()->{ synchronized(lock) { while(!hasWater) { log.info("没水等待"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("成功拿到水"); } },"thread1"); thread1.start();
Thread thread2 = new Thread(()->{ synchronized(lock) { while(!hasFruits) { log.info("没水果等待"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("成功拿到水果"); } },"thread2"); thread2.start();
Thread.sleep(2000); giveWater(); Thread.sleep(2000); giveFriuits(); }
private static void giveWater() { synchronized(lock) { log.info("水来了"); hasWater = true; lock.notifyAll();
} } private static void giveFriuits() { synchronized(lock) { log.info("水果来了"); hasFruits = true; lock.notifyAll(); } } }
[thread1] [INFO ] com.czm.concurrent.WaitAndNotify:19 - -- 没水等待 [thread2] [INFO ] com.czm.concurrent.WaitAndNotify:35 - -- 没水果等待 [main] [INFO ] com.czm.concurrent.WaitAndNotify:57 - -- 水来了 [thread2] [INFO ] com.czm.concurrent.WaitAndNotify:35 - -- 没水果等待 [thread1] [INFO ] com.czm.concurrent.WaitAndNotify:27 - -- 成功拿到水 [main] [INFO ] com.czm.concurrent.WaitAndNotify:65 - -- 水果来了 [thread2] [INFO ] com.czm.concurrent.WaitAndNotify:43 - -- 成功拿到水果
|
park&unpark
暂停当前线程 LockSupport.park(); 不会释放此前获取到锁。
恢复某个线程的运行 LockSupport.unpark(暂停线程对象)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.czm.concurrent;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.LockSupport;
@Slf4j public class ParkAndUnpark {
public static void main(String[] args) { Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("parking..."); LockSupport.park(); log.info("继续运行parking2"); LockSupport.park(); log.info("结束");
},"thread1"); thread1.start(); log.info("unpark"); LockSupport.unpark(thread1);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("unpark2"); LockSupport.unpark(thread1); }
}
|
学习链接
偏向锁、轻量级锁、重量级锁
学习链接
学习连接
volatile(jdk1.5及其以后版本生效)
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存,保证该变量的可见性(一个线程对变量的修改对另一个线程可见)。
注:线程可能从自己工作内存中的高速缓存中读取一个共享变量,没法及时知道其他线程对该变量的修改。导致对该变量的不可见现象。
volatie保证可见性与有序性
写屏障(sfence)
保证在该屏障之前的,对共享变量的改动,都同步到主存当中(可见性),
会确保指令重排序(JVM的JIT优化实现指令级并行)时,不会将写屏障之前的代码排在写屏障之后(有序性)。
1 2 3 4 5 6 7 8
| private static volatile boolean flag = false;
public void test(){ flag = true; }
|
读屏障(lfence)
保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据(可见性),
会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前(有序性)。
1 2 3 4 5 6 7 8 9 10
| private static volatile boolean flag = false;
public void test2(){ if(flag){ num++; } }
|
Java内存模型 JMM(Java Memory Mode)
链接1
链接2
jdk提供的原子类型
链接
原子整数
- AtomicBoolean
- AtomicInteger
- AtomicLong
原子引用
- AtomicReference (无法解决ABA问题)
- AtomicMarkableReference(带版本号的原子应用,解决ABA问题,可追踪修改)
- AtomicStampedReference (带修改标记,只辨别是否存在ABA)
原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
字段更新器
- AtomicReferenceFieldUpdater // 域 字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常 (java.lang.IllegalArgumentException: Must be volatile type)。
原子累加器
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
链接
JDK提供的线程池
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。
| 状态名 |
高三位 |
是否执行阻塞队列任务 |
是否接收新任务 |
说明 |
| RUNNABLE |
111 |
是 |
是 |
|
| SHUTDOWN |
000 |
是 |
否 |
不会接收新任务, 但会处理阻塞队列剩余任务 |
| STOP |
001 |
否 |
否 |
会中断正在执行的任务, 并抛弃阻塞队列任务 |
| TIDYING |
010 |
否 |
否 |
任务全执行完毕, 活动线程为 0 即将进入终结状态 |
| TERMINATED |
011 |
否 |
否 |
终结状态 |

1 2 3 4 5 6 7 8
| public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 急救线程生存时间 TimeUnit unit, //时间单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, //创建线程的工厂 RejectedExecutionHandler handler //拒绝策略 )
|
Executors类中用工厂方法提供以下常用线程池
newFixedThreadPool
1 2 3 4 5
| public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
|
线程数固定,核心线程数 == 最大线程数(没有救急线程被创建),阻塞队列是无界的,可以放任意数量的任务.
newCachedThreadPool
1 2 3 4 5
| public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
|
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,全部都是救急线程(空闲60s 后可以回收),队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)适合任务数比较密集,但每个任务执行时间较短的情况。
newSingleThreadExecutor
1 2 3 4 5 6
| public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
|
线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
比自己创建一个线程的优势:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一
个线程,保证池的正常工作。
Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改。(Executors.newFixedThreadPool(1) 初始时为1 ,可以强转ThreadPoolExecutor 对象后调用 setCorePoolSize 等方法进行修改。)
FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因
此不能调用 ThreadPoolExecutor 中特有的方法。
newScheduledThreadPool
1 2 3
| public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
|
执行定时/延时任务。
ThreadPoolExecutor原理分析
参考链接
任务提交执行流程
阻塞队列
LinkedBlockingQueue
底层是由内部类Node组成的单向链表,虚拟头节点。内部维护两个ReentrantLock,入队出队使用不同的锁,大大提高吞吐量。初始化时可指定其容量大小,没有默认Integer.MAX_VALUE(当入队速度大于出队速度时,有OOM风险)。
参考链接
LinkedBlockingDeque
底层是一个双向链表,内部只有一个ReentrantLock。容量规则和LinkedBlockingQueue一致,但比它多了更多出队入队方法。
SynchronousQueue
内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。
参考链接
ArrayBlockingQueue
用数组实现的有界阻塞队列,初始化时必须指定容量。
DelayQueue
有优先级的阻塞队列,有个PriorityQueue优先队列的字段。添加元素必须实现Delayed接口。
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。和SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列,与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步。
参考链接
Fork/Join
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
链接1
死锁产生四个条件
- 互斥条件:一个资源每次只能被一个进程使用。
- 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。
其他重要知识链接
理解乐观锁与悲观锁
java并发
jdk线程使用