多线程并发编程基础

基础知识

Java是支持多线程的开发语言,意思就是可以在多CPU核心的机子上同时处理不同的任务,优化资源的使用率,提升程序的效率。

1) 并发编程三要素:

原子性:一个或者多个操作要么全部执行成功要么全部执行失败。

有序性:程序执行顺序按照代码顺序先后执行,但是CPU可能会对指令进行重排序。

可见性:当多个线程访问同一个变量时,如果一个线程修改了变量,其他线程立即获取最新的值。

2) 线程的五大状态:

创建状态:当用new操作符创建一个线程的时候。

就绪状态:调用start方法,处于就绪状态的线程并不一定马上就执行run方法,还需要等待CPU的调度。

运行状态:CPU开始调度线程,并开始执行Run方法。

阻塞状态:线程的执行过程中可能因为一些原因进入阻塞状态,比如调用sleep方法,获取尝试得到一个锁等等。、

死亡状态:Run方法执行完或者执行中遇到异常。

3) 悲观锁和乐观锁

悲观锁:认为一定会有其他线程来改变他,所以每次操作就会加锁,会造成线程阻塞。

乐观锁:认为不会有线程来改变他,每次操作不会加锁,但是如果因为其他线程来改变了值,造成了冲突,会因为冲突而操作失败,但是他又会继续重试,直到成功为止,不会造成线程阻塞。

4) 线程之间的协作

线程之间的协作有:wait、notify、notifyAll等

5) Synchronized关键字(同步锁)

修饰一个代码块:被修饰的代码块称为同步语句块,作用范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象。

修饰一个方法:被修饰的方法称为同步方法,作用范围是整个方法,作用的对象是调用这个方法的对象。

修饰一个静态的方法:作用范围是整个静态方法,所用的对象是这个类中的所有对象。

修饰一个类:作用范围是synchronized后面括起来的部分,作用的对象是这个类中的所有对象

6) CAS

比较替换,是实现并发应用的一种技术,主要用于不想使线程进入阻塞的一种方式,操作包含三个操作数-内存位置V、预期原值A、新值B,如果内存位置和预期原值相匹配,处理器自动将该位置的值改为新值,否则就不做处理。

CAS三大问题:ABA问题、循环时间长开销大、只能保证一个共享变量的原子操作,即一个CAS只能处理一个。

7) 线程池

如果使用一个线程就创建一个,存在如果并发的线程量大,且每个线程执行一个任务就结束了,这样频繁的创建线程就会给系统带来很大的开销,导致系统效率大大降低,因为频繁创建线程和销毁线程是需要时间的,而线程池可以对线程进行复用,大大减少线程创建和销毁所带来的性能消耗。

Thread和Runnable概念

创建执行线程的两种方法:扩展Thread类以及实现Runnable接口重写Run方法

线程的特征和状态:

1) 不管是否并发,都有一个主线程的Thread对象,执行该程序时,java虚拟机会创建一个新的Thread并在该线程中执行main方法,这是非并发应用程序的唯一线程,也是并发应用程序中的第一个线程。

2) Java线程共享应用程序中所有资源,包括内存和打开的文件,快速简单共享信息,但是需要使用同步,也就是加锁来避免数据竞争。

3) Java线程的优先级,介于Thread.MAX_PRIORITY(10)和Thread.MIN_PRIORITY(1)之间,默认是5,通常较高优先级的线程会先于较低优先级的线程之前执行,但不是绝对,CPU可能会进行指令重排。

4) Java的两种线程:守护线程和非守护线程,我们常用的多线程都是非守护线程。

Java程序结束执行的两种情形:

第一种,通过执行Runtime类的exit()方法,用户具有可主动调用权力。

第二种,所有非守护线程执行完毕,JVM进程会自动退出,不会考虑守护线程运行情况。

守护线程主要用于垃圾收集器或缓存管理器中,在start方法执行前通过isDaemon方法检查线程是否为守护线程,也可以通过setDaemon方法将某个线程指定为守护线程。

5) Thread.States枚举类中定义的线程状态

New: Thread对象已经创建还未执行

Runnable: Thread对象已经在java虚拟机中运行

Blocked:Thread对象正在等待锁定

Waiting: Thread对象正在等待另一个线程的动作

Time_Waiting: Thread对象正在等待另一个线程的动作,但是有时间限制

Terminated: Thread对象已经完成了执行

getState方法可以获取Thread对象的状态,并且修改,但是在给定时间内一个线程只能有一个状态。

Thread和Runnable应用

Runnable接口只定义了一种可实现的方法,即run方法,和继承Thread类是不一样的,当执行start方法启动新线程就会调用run方法。

Thread类常用方法:

1) 获取和设置Thread对象信息的方法:

getId:返回Thread对象的标识符,他是线程创建分配的一个正整数,整个生命周期唯一且不能修改。

getName/setName:获取或设置Thread对象的名称。

getPriority/setPriority:获取或设置Thread对象的优先级。

isDaemon/setDeamon:获取或者建立Threa对象的守护条件。

getState:返回Thread对象的状态。

2) Interrupt:中断目标线程,比如该线程处于sleep休眠中,使用它就可以直接唤醒,并给线程打上中断标记

3) Interrupted:判断目标线程是否被中断,并清除该线程中断标记。

4) Isinterrupted:判断目标线程是否被中断,但是不会清楚线程中断标记。

5) Sleep(long ms):线程执行暂停、休眠的时间。

6) Join:暂停线程执行,直到调用该方法的线程执行结束为止,就是A B两个线程,B线程通过join方法切入到A线程中,那么A线程需要等待B线程执行完毕,他才能继续执行。

7) setUncaughtExceptionHandler:用于建立未校验的异常处理器。

8) currentThread:Thread类静态方法,返回实际执行该代码的Thread对象。

Callable接口

与Runnable接口相似,主要用在filter模式,比如mq消息的处理,但是:

1) Callable接口声明call方法,具有参数和返回值,并且入参和返回值是相对应的。

2) 声明了call方法,必须返回声明中指定对象的返回值

3) Call方法可以抛出任何一种校验异常,可以实现自己的处理器重载afterExecute方法处理异常

img

img

Synchronized关键字

Synchronized一般用在同步代码块里面给某个对象加锁,就是多线程想要干某件事时判断你是否获取了这个对象锁,类似于门票机制。加锁的方式有多种,可以给方法或者类或者创建的一个obj对象,但是本质都是面向对象加锁,需要操作这个对象的时候你就要获取到对象的锁才能进行下一步。

img

对象锁、方法锁、类锁理解:

静态方法只能加类锁 加在方法上也是类锁 和其他非静态方法锁是不互斥的 可以同步执行 只会和相同类锁互斥

分三种情况 对象锁 非静态方法锁 和静态类锁

创建一个类对象多线程共用:

方法锁之间互斥

类锁之间互斥

对象锁互斥

而类锁 和方法锁 和对象锁之间都不互斥

创建多个类对象多线程独用:

方法锁之间不互斥

类锁之间互斥

对象锁之间不互斥

而类锁 和方法锁 和对象锁之间都不互斥

锁是一个对象:

这个对象内部有一个标志位,记录自己有没有被某个线程占用,以0和1标记。

如果这个对象被某个线程占用,记录这个线程的thread ID

这个对象维护一个thread id list,记录其他所有阻塞的、等待获取这个锁的线程。当前线程释放这个锁之后从thread id list里面取一个线程唤醒。

锁的实现原理:

在对象里面,有一块数据叫mark word,以64位windows系统为准,里面有两个重要字段锁标志位和占用该锁的thread id,不同系统结构不同。

Wait方法和Notify方法

常见多线程编程模型之生产者消费者模型

img

一个内存队列,生产者往里面放,消费者从里面取。

  1. 内存队列加锁实现线程安全

  2. 阻塞,当内存队列满了生产者放不进去东西阻塞,内存队列空了消费者取不到东西也会阻塞。

  3. 双向通知,当内存队列满了,生产者阻塞,通知消费者取,当内存队列空了,消费者阻塞,通知生产者生产。

阻塞的实现

1) 生产者、消费者线程调用自己的wait和notify方法,自己阻塞自己

2) 用一个阻塞队列,当取不到数据或者放不进数据时,表明已被阻塞

双向通知的实现

1) wait和notify机制

2) condition机制

wait方法和notify方法为什么必须要和synchronized关键字一起使用?

因为对于同一个对象,一个线程调用了该对象的wait进行等待,另一个线程调用了对象的notify进行唤醒,两个线程之间要通信,为了达到他们之间的协作,就必须要使用synchronized关键字同步给对象,也就是给对象加锁,等唤醒之后两个线程在争抢对象锁。任何对象都可以被synchronized修饰成为锁,所以notify和wait只能放在Object类中。

为什么wait的时候必须释放锁?

假设A线程B线程,如果A线程wait却不释放锁,那么B线程就永远没有机会进入同步块中来唤醒A线程,就发生了死锁,所以wait的机制就是先释放锁,等其他线程争抢,自己进入休眠状态中,等待其他线程唤醒重新获取锁

wait和notify存在的问题:

生产者在通知消费者的同时也通知了其他生产者,消费者在通知生产者的同时也通知了其他消费者,因为wait、notify以及synchronized所作用的对象都是同一个,一个对象是没办法区分队列空还是队列满的两个条件,于是要引入condition来解决此问题。

Interrupt方法

Interrupt异常:

必须要是声明了会抛出InterruptedExeception的函数才会抛出异常,异常抛出取消状态,也就是sleep、join和wait

轻量级阻塞和重量级阻塞:

轻量级阻塞,能被中断的阻塞,通过wait进行阻塞等待,可以被interrupt唤醒;而不能被中断的阻塞被称为重量级阻塞,比如synchronized修饰的同步块,状态为blocked。

img

初始线程处于new状态,调用start执行,进入runnable和ready状态,如果没有调用任何的阻塞函数,线程就会在这两种状态之间切换,也就是系统的时间片调度,两种状态的切换是操作系统完成的,除非手动调用yield函数,放弃CPU的占用。

一旦调用图中的任何阻塞函数,线程就会进入waiting和waiting_time状态,前者无限期阻塞,后者根据设定时间阻塞,如果使用了synchronized关键字或者块,则会进入blocked状态。因此thread.interrupted中断线程的精确含义是唤醒的是轻量级阻塞线程。

Thread.Interrupted和Thread.isinterrupted的区别:

这两个方法都是用来判断自己是否收到过中断信号,前者是实例方法后者是静态方法,前者只是读取中断状态,不做任何操作,而后者不仅读取还要重置中断标志位。

线程优雅的关闭

在java中有stop、destory等方法强制杀死线程,但是这样操作也会带来线程中的资源如网络连接、文件描述符等无法正常关闭,不推荐使用,因此一个线程运行应该让他执行完毕释放所有资源再退出,不断循环的多线程可以通过线程之间的通信机制,让主线程通知退出。

守护线程

img

当一个jvm进程里面有多个线程,这些线程被分为两类:守护线程和非守护线程,默认都是非守护线程,当所有非守护线程退出后,整个jvm进程都会退出,,守护线程不影响jvm进程的退出,比如垃圾回收线程就是守护线程,他们默默在后台工作,当开发者的所有前台线程即非守护线程退出之后,整个JVM进程也退出了。

设置标志位来优雅关闭线程

也就是设置一个布尔变量,初始值是true,在线程run方法中通过while循环执行,当线程运行到一定地步,我把这个变量重置为false,那run方法执行不下去了就自动退出。

img

如果在while中被阻塞了,就会无法退出循环,这时候可以通过interruptedExecption异常和interrupt函数来处理

并发核心概念

并发:系统同时运行多个不同的任务,比如单核CPU,他同时只能干一件事,但是他可以通过时间分片来切换同时让多个线程运行,这就是并发。

并行:系统同时运行两个以上线程,线程之间互相不争抢资源,如多核CPU,每个核心可以自己干自己的事,互相不受影响。

比如nginx,他启动之后有一个master和多个worker,master负责监听,worker负责处理请求,worker的数量和系统的cpu核心数量是一样的,这样每个核心就只运行一个worker核心,不需要上下文切换,效率就会有很高提升。所以worker就是并行运行

并发同步问题

并发同步是为协调两个或者更多任务获取预期结果的机制。

实现的两种方式:

1) 控制同步:A任务依赖于B任务的结束,A任务不能在B任务还没结束之前就开始,串行处理,按顺序来。

2) 数据访问同步:当两个或者更多任务访问共享变量时,任意时间只有一个任务可以访问该变量,比如锁机制

临界段:他是一段代码,用来访问共享变量,互斥是保证这一要求的机制,在任意时间只能被一个任务执行。

并发系统实现同步机制

信号量:一种用于控制对一个或者多个单位资源进行访问的机制,他有一个用于存放可用资源数量的变量。互斥就是一种特殊的信号量,他只有两种状态忙和空闲,当空闲就可以被争抢,当忙就只能被持有线程释放,通过保护临界段避免条件竞争。

监视器:一种在共享资源上实现互斥的机制,他有一个互斥、一个条件变量、两种操作即等待条件和通报条件,一旦你通报了该条件,在等待他的任务中只有一个会继续执行。

如果共享数据受同步机制保护,那么代码就是线程安全的。

不可变对象

初始化之后就不可以修改,如果想要修改就必须创建一个新的对象,所以他是线程安全的,在并发编程中使用不会出现任何问题,比如我们常用的String类,当给他赋值的时候,他其实是重新创建了一个新的对象,他底层是final修饰的。

原子操作和原子变量

原子操作:我这个操作要么成功要么失败,通过临界段来实现原子操作,以便于对整个操作实现同步机制。

原子变量:通过原子操作来设置或获取的变量,通过同步机制或者CAS来实现,CAS是乐观锁的实现方式之一,先拿到值,不加锁,最后修改的时候再获取当前值与之前值比较来判断是否能同步。

共享内存和消息传递

并发多线程之间的两种通信方式

共享内存:同一台计算机运行多个任务,在相同的内存区域读或写,对该内存区域访问采用同步机制的保护临界段

消息传递,比如生产者消费者队列,在一个内存区域共享。

消息传递:不同计算机运行多个任务,多任务之间执行消息传递,需要遵循预定义的协议,而且分两种情况同步和异步,发送方发送消息后阻塞并等待响应就是同步,发送方发送消息后继续执行自己流程就是异步。比如分布式集群,在不同机器部署,又需要互相之间消息通讯。

并发编程的问题

数据竞争

多个任务在临界段之外对共享变量进行写入,且没有任何同步机制就会存在数据竞争。多个任务执行同一个方法,因为执行顺序不同最终结果也会不同。

死锁

多个任务正在等待必须由另一线程释放的某个共享变量才能继续运行,比如A线程、B线程、C线程形成A等B、B等C、C等A,造成闭环死锁。

形成死锁必须满足的四种条件,也称为Coffman条件:

1) 互斥:死锁中涉及的资源必须是不可共享的,一次只能有一个任务可以使用该资源。

2) 占有并等待条件:一个任务在占有某一互斥资源时又请求另一互斥资源,且他在等待时不会释放任何资源。

3) 不可剥夺:资源只能被持有他的任务释放

4) 循环等待:任务1等待任务2所占用资源。。。任务n等待任务1所占用资源形成循环等待。

避免死锁的方式:

1) 忽略:发生死锁重新执行

2) 检测:检测程序是否有死锁,如jconsole

3) 预防:根据Coffman条件进行预防,让他不能达到四个满足条件

4) 规避:任务执行前,对空闲资源和任务需要的资源进行分析,是否会形成死锁来判断任务是否可以执行。

活锁

任务1和任务2两个并发线程执行需要两个资源,任务1执行先对资源1进行了加锁,任务2执行先对资源2进行了加锁,当任务1需要用资源2,因为被任务2持有,没法获取,于是他就释放了资源1,任务2也是一样,互换人质,然后下一步执行又发现任务1获取不到资源1咯,一直循环下去就形成了活锁。活锁占CPU且占内存,因为他们是一直运行下去的,而且存在资源交换,而死锁只占内存,不占用CPU,因为他们已经没法继续执行下去了。

资源不足

某个任务在系统无法获取维持其继续执行下去的资源,就会发生资源不足问题。通过公平原则可以解决此问题,通过算法实现类似于排队原则,避免大家争抢出现资源都没抢够资源不足无法继续运行的问题,但是公平原则会导致效率降低。

优先权反转

低优先权任务持有了高优先权任务所需的资源,就会发生优先权反转,低优先任务会先执行,然后释放资源,高优先权任务才能继续执行。

JMM内存模型

JMM内存模型为了解决一些问题而诞生的。

内存可见性问题

比如一个4核的CPU,分三级缓存,一级二级分数每个核心,三级横跨所有核心,负责和内存条同步,基于缓存一致性协议,多核缓存之间进行同步的,诞生缓存一致性协议对性能消耗过大,于是在单核里面加入了两个读loadBuffer和写的storeBuffer,先写到buffer中,再进行同步,把同步变成异步。但是这样就会出现缓存不一致问题即内存可见性问题。在Java中就是多线程之间线程缓存与共享内存不一致问题。

img

重排序和内存可见性关系

比如两个线程通过Store buffer延迟写入到共享内存实现缓存同步,但是如果读线程先于写线程执行,就会出现读取不到数据情况。

重排序类型:

1) 编译器重排序。没有依赖关系的语句,编译器可以调整顺序。

2) CPU指令重排序。指令级别执行,两条指令依赖关系就可以不按顺序执行。

3) CPU内存重排序。指令执行顺序与写入执行顺序不一致,比如通过异步调用,造成缓存不一致就是内存重排序

内存屏障

为了禁止编译器重排序和CPU指令重排序,在编译器和CPU层面都有相应指令防止重排序,也就是内存屏障。他是JMM和happen-before规则的底层实现原理。

编译器的内存屏障是告诉编译器在编译过程中不要进行指令重排,而CPU的内存屏障是指令,给开发者调用,比如volatile关键字。

JDK8提供了一个Unsafe类提供三个内存屏障函数

img

CPU内存屏障分为四种:

1) LoadLoad:禁止读和读的重排序

2) StoreStore:禁止写和写的重排序

3) LoadStore:禁止读和写的重排序

4) StoreLoad:禁止写和读的重排序

Unsale类的三个内存屏障函数范围:

1)loadFence:LoadLoad+ LoadStore

2)storeFence:StoreStore+ LoadStore

3)fullFence:loadFence+ storeFence+ StoreLoad

重排序的原则即as-if-serial

单线程程序重排序规则

单线程只要操作之间没有数据依赖,就可以进行CPU指令重排序,因为执行的结果是不受影响的。

多线程程序重排序规则

多线程之间数据依赖太复杂,就会出现内存可见性问题,编译器和CPU没有办法根据依赖关系作出优化,他只能保证单线程遵循重排序原则,而上层要告知编译器和CPU在多线程情况下什么时候可以重排序什么时候不可以。

Happen-before

他是JMM内存模型的规范,从字面理解就得到结果了先行发生,用于描述两个操作之间的内存可见性。

主要干两个事,

第一:编译器和CPU可以灵活重排序

第二:开发者能知道的那些重排序和不应该知道的重排序,比如开发者知道可以用volitile和synchronized等线程同步机制来禁止重排序。

比如A happen-before B,那么A执行结果必须对B可见,保证跨线程之间的内存可见性。

Happen-before的传递性

除了基本规则之外,它还具有传递性,即A happen-before B、B happen-before C。。。

比如在同一个线程中两个方法且有一个公共变量a初始值是0,set方法把值设置为了5,set方法在get方法之前执行,调用get就获取到的值就是5,就算公共变量没有用volitile修饰,因为set方法和get方法是遵循了happen-before的传递性的。

Volatile关键字

三个作用:64位写入的原子性、内存可见性、禁止指令重排序

基于上面那个例子,如果是多线程情况并发执行下,不加volitile关键字,那么get方法得到的值就不一定是5了。

重排序:DCL问题即双重校验锁

img

如果不加volatile关键字,这个单例就会存在问题,他的执行顺序如下:

  1. 分配一块内存

  2. 在内存上初始化成员变量

  3. 把instance引用指向内存

犹豫2和3没有依赖关系,所以他们就可能发生指令重排序,即先把instance引用指向内存再初始化成员变量就会造成构造方法溢出的问题。

Volatile的实现原理

1) 在volatile写操作前插入一个storestore(禁止写和写)内存屏障保证volatile写操作和之前写操作不会重排序

2) 在volatile写操作后插入一个storeload(禁止写和读)内存屏障保证volatile写操作和之后读操作不会重排序

3) 在volatile读操作后插入一个loadloadl(禁止写和写)屏障和loadstore(禁止写和读)屏障,保证volatile读操作不会和之后的读操作、写操作重排序。

JSR-133对volatile的增强

JDK5开始,只允许把一个64位的long/double类型变量写操作拆分成两个32位写操作来执行,而读不允许了,任意读操作都必须具有原子性,不可拆分。

Final关键字

Final关键字也可以解决构造方法溢出问题,因为他有遵循happen-before原则,通过final关键字修饰,可以保证值在构造方法之前完成,不会出现另一个线程取值的时候还未完成对象变量初始化。

Happen-before规则总结,也就是JMM的规范承诺:

1) 单线程的每个操作,happen-before在该线程中任意后续操作。

2) 对volatile变量的写,happen-before在后续进行这个变量的读。

3) 对synchronized的解锁,happen-before在后续对这个锁加锁。

4) 对final变量的写,happen-before在后续对finale变量的读。

img

二、JUC之并发容器

BlockingQueue

在所有并发容器中最常用的一种,他是一个阻塞队列,当入队列时,如果队列已满,则阻塞调用者,当出队列时,如果队列为空,则阻塞调用者。

他定义的是一个接口,有多个不同的实现类以不同方式来实现这个阻塞队列。

img

img

只需要关注里面重要的方法,如add、offer、put方法都是添加值,只是返回类型不同且put会阻塞并抛出异常,而add和offer是非阻塞的,remove移除、take获取、poll获取。不过remove是非阻塞式的,而take和poll是阻塞式的。

ArrayBlockingQueue

数组实现的环形队列,在自己的构造函数中传入数组的容量。

他的核心结构就是两个队头队尾指针以及核心的一个锁加上对应的两个condition条件,这里就用到了前面说的wait、notify以及synchronized所作用的对象都是同一个,没办法判断队列是空还是满的情况,增加condition来解决此问题。

img

各种阻塞队列的实现方式都大同小异,以ArrayBlockingQueue源码解读实现原理来讲:

Put方法:先获取锁然后锁上,接着判断当前队列的元素个数是否已经等于数组长度相等,说明队列已满,就await阻塞等待,否则进入放数据的处理方法,获取数据并放入,并且判断队列长度是否已经达到了数组长度,达到了就把队列长度值初始化为0,进行重新下一轮塞值循环,接着把数据放入队列之后就可以通知非空条件。通过他来告知消费者里面已经有数据了,可以来获取。

img

img

Take方法:跟get方法一样,也是先获取锁,然后加锁,判断队列是否是空,如果为空获取不到数据,进入阻塞状态,如果count不等于0了证明有值就进入获取值方法,在里面直接拿到数组对象,通过下标拿到对应的值,拿到值之后这个值就设置为null,代表值已经获取了,同时判断队列获取数据的下标长度是否已经达到了数组的长度,即已经拿到了最后一个值,那么就初始化下标长度变量,进入下一轮取值循环,等拿到值之后就可以通过非满条件,也就是队列里面的值不是满的来通知生产者生产数据放入队列中,最后返回拿到的值并释放锁。

img

img

LinkedBlockingQueue

基于单向链表的阻塞队列,因为他的队头和队尾是两个指针双向操作,因此用了两把锁(分别是写入锁和读取锁)和两个条件(非空条件对应写入锁,非满条件对应读取锁)及一个原子变量记录count数量,可以进行一边追加一边消费,先入后出原则,效率更高。

img

LinkedBlockingQueue和ArrayBlockingQueue的差异

1)LinkedBlockingQueue为了提高并发读,用了两把锁分别控制队头队尾,代表put和put、take和take之间是互斥的,但是put和take之间不互斥,而ArrayBlockingQueue只有一把锁,put和take之间是互斥的,不能同时又读又写。但是他们也有相同点,对于count变量,都需要操作,必须是原子类型。

2)LinkedBlockingQueue因为各自拿了一把锁,所以当需要调用对方的condition和signal,还必须的加上对方的锁,不然就会成我们前面说到的活锁情况发生。

PriorityBlockingQueue

队列通常是先进先出,而priorityBlockingQueue按照元素优先级从小到大出队列,也称为优先级队列,通过compareto比较大小,元素自己有带有比较功能,他与前面的队列最大差异就是内部有一个数组实现的二叉堆,以及一个锁和一个非空条件构成,没有非满条件,因为他这个堆是可扩容的,默认值是11,超出11之后就会自动扩容,所以永远没有阻塞。因为是二叉堆,所以出的第一个元素永远是堆顶那个,取出堆顶元素之后又重新构建,和ArrayBlockingQueue相似,但是没有非满条件。

img

DelayQueue

DelayQueue延迟队列,按延迟时间从小到大出队的PriorityQueue,所谓延迟时间,就是未来将要执行的时间减去当前时间,他是一个接口,继承comparable,重写getDelay方法,如果getDelay返回值小于等于0,说明该元素到期,需要从队列中拿出来执行,他通过getDelay返回值来比较大小。他由一把锁一个非空条件以及优先级队列构成,优先级队列是由二叉堆构成。

img

注意:

关于取元素:他不同于一般的阻塞队列,只有队列为空时才阻塞,如果堆顶元素延迟时间没到也会阻塞,无法从堆顶取出。通过Thread leader变量可以记录等待堆顶元素的第一个线程,因为这样可以通过getDelay方法获取到期时间,不用无限期等待,通过condition条件设定到期时间,时间到了就获取出来了。

关于放元素:不是每一个元素进入都需要通知等待的线程,因为线程取的是堆顶元素,除非堆顶元素的等待时间大于当前放入元素的等待时间,即当前放入的元素需要更先被取出来,才需要通知等待的线程。

SynchronousQueue

他是一种特殊的阻塞队列,本身是没有容量的,他是一个链表,可以挂多个元素,但是不存储,调用put挂值就会阻塞,等另一个线程调用了take获取,两个线程同时解锁,他就相当于中介,处理一手交钱一手交货的那个中间人。而且他支持多线程同时放取操作。他也有公平和非公平模式,公平是先进先出,不公平就是先进后出,即后到线程先配对。通过栈来实现,下图中的三目表达式就根据是否公平来决定是队列结构还是栈结构。这两种结构内部其实还是一个单向的链表,只是执行顺序不一样而已。

img

BlockingDeque

可以理解为double and queue,阻塞的双端队列接口,他的有一个且只有一个实现类LinkedBlockingDeque,是一个双向链表。他只有一把锁,所以只能干一件事,要么生产要么消费。其优点是查找定位速度快,可以从两边开始。

img

CopyOnWrite

从名字就可以得到结果了,复制写,就是写的时候不操作源数据,而且拷贝一份来修改,在通过悲观锁或者乐观锁进行回写,这样操作的目的就是在读的时候就不用加锁了,不加锁就可以提高读取效率,而且我根本就没有操作源数据,加锁无意义。他是以空间换时间的典型策略。

重要的集合:CopyOnWriteArrayList集合、CopyOnWriteArraySet集合

ConcurrentLinkedQueue/Deque

他们的原理是相同的,AQS阻塞队列基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。ConcurrentLinkedQueue是一个单向链表,与AQS类似,同样基于CAS,同样通过head/tail指针记录队列头部和尾部,他通过CAS来实现阻塞,而不是wait直接阻塞。Head、tail在初始的时候都是指向null,在AQS阻塞队列中,每次入队,tail一定后移一个位置,每次出队,head一定后移一个位置,保证head指向队列头部,tail指向队列尾部。但是在ConcurrentLinkedQueue中head/tail的更新可能就落后节点的入队、出队,因为他不是直接对head/tail指针进行的CAS操作,而是对节点node的数组item操作的。

入队列的时候P的next指针只要进行了CAS操作就算入队成功,因为next已经把关系建立,不需要同步移动tail指针,所以他追加两次才移动一次指针,一次移动两个位置,和AQS同步移动完全不同。

出队列的时候并非根据tail指针进行判断,而是依赖于head指针后续节点是否为空,只要对节点的数据执行CAS成功,置为NULL,则出队成功,head指针没有移动可以由下一个线程来操作。

img

ConcurrentHashMap

HashMap的实现方式是数组加链表,被称为拉链法,而ConcurrentHashMap是在hashMap的基础上应对多线程的各种优化,首先所有数据都放在一个大hashmap中,其次引入了红黑树,注意JDK8之后hashMap也引入了红黑树进行优化。

img

由原理图可分析得到,他是竖向数组加横向链表或者树构成,如果他是一个普通的节点,那么就是一个链表结构,如果是一个树结构节点,那么他就是一个红黑树结构,链表和红黑树之间是可以转换的,初始都是链表,当链表数据元素达到某个阀值时,就会转化成红黑树,优化数据结构。如果小于这个阀值,就会转化成链表。

设计的目的:

1) 元素越多,使用红黑树查询更新效率更高,链表结构元素越多越容易产生hash冲突,而红黑树能够解决冲突问题。

2) 加锁不是对整个concurrentHashMap加锁,而是对数组每个头节点加锁,随着数组的长度的增加,并发度就会越来越高,初始长度是16。

3) 支持并发扩容,也就是超过了初始长度16了。

img

整个结构里面最重要的数据就是cap,他就是node数组的长度,为2的整数次方,通过传入初始容量计算得到,并且控制sizeCtl在初始化或并发扩容时候的线程数初始值也是他,。

初始化

多线程的竞争是通过对sizeCtl进行CAS操作实现的,如果某个线程成功把sizeCtl设置为-1,他就拥有了初始化的权利,等到初始化完成,再把sizeCtl设置回去,而且由于初始化工作量小,其他线程在没有争抢到初始化资格的时候并不是放弃了CPU,而是自旋等待。

Put方法

可分为四个分支,也可以称为四步,因为他是一个循环的过程,整个循环结束,也就是完成四个分支总元素加1:

1) 整个数组初始化

2) 每个元素的初始化,也就是所在槽为空,得到该元素为该槽第一个元素,新建一个头结点并返回

3) 扩容,链表转红黑树的规则:链表元素阀值为8,并且整个数组的长度超过64。当数组长度没有达到64,那么链表只会进行普通扩容,并不会转化成红黑树。

4) 放入元素,在加锁后往链表或者红黑树添加元素,通过头节点类型判断是node还是treenode

扩容原理

新建一个hashMap,其数组长度是旧数组长度两倍,然后把旧数组元素逐个迁移,多线程情况下为了避免产生条件竞争,ConcurrentHashMap把整个数组扩容进度分片,也就是根据有多少个CPU核心得到最大并发处理的线程数,然后分工,每个线程干一点,最后完成整个数组扩容,他的内部有一个变量记录扩容进度,通过CAS进行操作,所以不仅提高扩容效率,还能保证并发下线程安全。

1)扩容过程中的数据访问处理?

如果在扩容中有线程访问node1节点数据,但是node1节点已经到新ConcurrentHashMap中了,就数组node1已经没有值,这个时候会有一个转发节点,记录新ConcurrentHashMap的引用,通过转发节点获取迁移的数据并返回

2)初始值为16,扩容设计成2倍,扩容阀值0.75的原则?

img

sizeCtl不同值的含义

-1:正在初始化

其他负数:多线程正在做并发扩容

Cap:未初始化之前的初始容量

扩容成功之后sizeCtl存储的是下一次扩容的阀值,也就是0.75

ConcurrentSkipListMap/Set

ConcurrentHashMap是一种key无序的hashmap,ConcurrentSkipListMap则是有序的。

使用场景

在util包中,非线程安全的Treemap也是key有序的,基于红黑树实现。

在concurrent包中,key有序的hashmap就是ConcurrentSkipListMap,不过他不是基于红黑树,他是基于SkipList跳表实现的。因为目前还没有能够高效对树结构实现无锁,删除添加节点的办法,因此跳表就出来了,他可以满足无锁的进行高效删除、添加节点。

无锁链表的问题

前面blockqueue讲了很多无锁队列,其实现也是链表,但是在队头队尾通过CAS实现操作,如果在中间插入就会出现问题。因为多线程情况下中间操作数据,删除和插入,即使通过CAS操作,你不能确定当前拿到的元素是否还是有效元素,比如有1、10、30三个节点,现在两个线程同时操作,A线程要在增加20节点,B线程要删除10节点,A线程执行的时候,通过CAS先把20节点指向30,然后把10节点指向当前20,这是原子性的,接着呢B线程要删除10节点,于是他就拿到了1节点,并指向30,两个CAS操作分开来看是没问题的,但是合并在一起就出问题了,因为实际操作的时候,都是拿的前驱节点,即1节点,而A线程操作的时候并不知道10节点已经被删除了,最后遍历重组的时候就会出现1节点指向了30,而20就被忽略了,连同10节点一并被干掉了。

解决办法:

通过新增一个marker节点,让当前被操作元素next指向他,这样在当前操作元素后面添加元素就可以判断当前操作元素的next节点是否指向了marker节点,以上面例子,A线程插入20节点,10节点指向marker,B线程如果删除了10节点,那么10节点的marker指向就被破坏了,而A线程在插入的时候就会发现问题,因为他拿到不是10这个节点了,就算10在哪里也和他没有关系,他CAS的是10节点指向的marker。

跳表

上面无锁链表的问题被解决了,对于跳表而言,就可以进行高效的删除、添加操作了,因为跳表是多层链表叠加起来的,也就可以通过指定下一个marker节点来实现CAS操作。

img

比如上面的跳表,我要查找元素19,或者是要在19节点后面插入一个20节点,通过头元素跳跃比较,将会直接过滤掉很多不需要查询的节点,找到所在位置,性能提升非常高,跳表的增删查效率和红黑树不相上下,他是以空间换取时间的一种设计,应用非常广泛,比如Redis。

img

ConcurrentSkipListMap底层通过跳表来实现,因此他就只需要记录头部元素head节点即可。查找、删除、添加就先从头部节点开始找,因为他最底层下面肯定是有序排列的,只需要通过头部节点一层层往下找,快速定位到所在节点位置。

三、JUC之同步工具类

Semaphore

Semaphore也就是信号量,提供了资源数量的并发访问控制。其实就是对共享资源争抢方式的控制,N个线程来竞争限制多少线程可以拿到,是否公平竞争。当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。

举个例子,一个只有两个2座位,然后有5个同学要写作业,怎么办呢,抢座位啊,通过代码如下:

img

img

CountDownLatch

使用频率较高,主要作用是通过CountDownLatch来实现让主线程来等待其他worker线程,也就是子线程执行完才能退出。

img

img

CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。

阻塞实现:通过state状态来判断,只要不为0调用await()方法的线程就会被放入阻塞队列。

唤醒实现:基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直减state,减到0后一次性唤醒所有线程。

img

CyclicBarrier

应用场景:用于协调多个线程同步执行操作的场合。比如面试分笔试和面试,笔试需要等人齐了一起开始,而面试就单独进行,而来面试的人就可以看做一个个单独的线程,需要协调一起做笔试题。

img

img

实现原理:CyclicBarrier基于ReentrantLock(重入锁)+Condition(条件)实现。

注意:

1) CyclicBarrier是可以被重用的

2) CyclicBarrier 会响应中断。再等待过程中收到中断信号,所有阻塞线程都会被唤醒,然后重新开始。

3) 回调方法只会被执行一次,而不是每个线程都执行一次。

Exchanger

Exchanger用于线程之间交换数据。多线程之间并发调用exchange方法,会两两互相交换数据,如果是单线程的话就会进入阻塞状态,分即时等待和永久等待。

img

实现原理:Exchanger的核心机制和Lock一样,也是CAS+park/unpark。每个线程在调用exchange(…)方法交换数据的时候,会先创建一个Node对象。这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。如果多个线程就通过内部的多个node节点。

Phaser

JDK7开始,新增了一个同步工具类Phaser,他可以替代CyclicBarrier和CountDownLatch,并且功能更丰富。

Phaser替代CountDownLatch

CountDownLatch时讲到它可以实现让主线程等待所有子线程执行完毕再执行,涉及到两个方法await和countDown,在Phaser中相对应的方法是awaitAdance和arrive。

img

Phaser替代CyclicBarrier

CyclicBarrier时讲到它可以协调多个线程一起开始干同一件事,涉及到一个方法await,在Phaser中向对应的方法是arriveAndAwaitAdvance),他就是 arrive()与 awaitAdvance()的组合。表示我已经到达同步点,等待所有人到达再继续执行。

img

Phaser新特性

1)动态调整线程个数:CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行 期间动态地调整要同步的线程个数,比如笔记过程中插入了一个新面试者进来和大家一起笔试。

2)层次Phaser:多个Phaser可以组成一个树状结构,可以通过在构造方法中传入父Phaser来实现。

Phaser没有基于AQS来实现,但是具备AQS的核心特性:state变量、CAS操作、阻塞队列。

arrive()方法:根据设置的总数,与到达的线程数相减,到0之后唤醒队列线程并重置state,同时phase累加与线程总数一致。

awaitAdvance()方法:通过while循环的4个分支进行阻塞判断,当4个分支都走完进行唤醒,主线程执行。

四、JUC之Atomic类(原子变量类)

AtomicInteger和AtomicLong

两者原理相同,假设一个类中有个int类型变量count,类里面有两个方法,A方法++;B方法–;多线程情况下为了保证数据安全有效,我们的常规办法就是在A、B方法上加synchronized关键字进行加锁,其实还可以通过把这个count变量声明成AtomicInteger类,这样不需要加锁在并发情况下也是原子性操作,线程安全的。他的内部是通过自旋进行CAS操作实现。

悲观锁和乐观锁

悲观锁:认为数据发生并发冲突的概率很大,读操作之前就上锁。synchronized关键字, ReentrantLock都是悲观锁的典型。

乐观锁(CAS):认为数据发生并发冲突的概率比较小,读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS。

CAS的实现

所有调用CAS的地方,比如前面Unsafe类以及现在的AtomicInteger类,都会先通过这个方法把成员变量转换成一个Offset。而这个Offset不是原本value了,而是他的偏移量,所以CAS操作的是偏移量。

自旋和阻塞

阻塞:当一个线程拿不到锁放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。

自旋:当一个线程拿不到锁不放弃CPU,空转,不断重试,也就是所谓的自旋。

注意:单核CPU只能阻塞,多核CPU才能自旋,因为单核CPU自旋,其他线程就卡死没办法运行了。两种可以结合使用,比如设定一定自旋时间等待,时间到了还没获取锁就切换成阻塞,放弃CPU。Synchronized关键字就是这种实现策略。

AtomicBoolean和AtomicReference

AtomicBoolean

比如一个布尔变量,在逻辑代码中,如果为false就取反,设置为true,但是只限于单线程,并发情况下就要用AtomicBoolean,结合他的乐观锁机制,在多线程并发情况下保证线程安全。

AtomicReference

原子引用类,其实就是一个对象两个线程共用,比如一个线程负责读另一个线程负责更新,如果不保证线程安全的情况下,那数据将会出现问题,如读到过期数据,类似于Mysql中的脏读,使用AtomicReference类就可以避免这个问题。

如何支持boolean和double类型

在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)即compareAndSetInt、compareAndSetLong、compareAndSetObject,在jdk的实现中,这三种CAS操作都是由底层实现的,其他类型的CAS操作都要转换为这三种之一进行操作。Int类型根据0和1就可以转化成boolean类型,double的话则是依赖于和Long类型互转。

AtomicStampedReference和AtomicMarkableReference

ABA问题与解决办法

ABA问题:比如A、B、C三个线程,A线程根据约定在等待B线程释放共享资源给他然后继续运行,但是B线程释放共享资源这个事C线程也知道,并且也在约定的多线程之内,当B线程释放的资源了,放入了对象中,C线程跑去拿走并消耗了,A线程回来发现对象中还是没有这个资源,又跑去找B线程要。

解决办法:CAS增强,对这个对象增加版本号,不仅比较值,还比较版本号,虽然前后都是为空,但是他中间有段时间是有值的,可以证明B线程确实释放了资源给他。

因此在AtomicStampedReference中,他的compareAndSet方法,即CAS操作就不是只有新值,旧值两个参数,还有版本号的新值和旧值,这四个对象都是装在一个叫Pair的内部类里面。Integer型或者Long型的CAS没有办法同时比较两个变量,他就是一个值,没有引用。

AtomicMarkableReference与AtomicStampedReference原理相似,不过他的Pair内部类版本号是一个布尔变量,没办法完全避免ABA问题,只能降低发生概率。

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater

AtomicIntegerFieldUpdater:原子integer类型属性更新类

AtomicLongFieldUpdater:原子Long类型属性更新类

AtomicReferenceFieldUpdater:原子引用类型属性更新类

他们都是一个基于反射的工具类,能对指定类的指定的volatile字段进行原子更新。

但是使用具有一定约束条件:

1) 对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

2) 字段必须是volatile类型的,在线程之间共享变量时保证立即可见。

3) 只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。

AtomicIntergerArray、AtomicInLongArray和AtomicReferenceArray

三个数组元素的原子操作。这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作。

其实他向比较与第一个AtomicInterger类,就多了一个下标而已,因为他是数组类型的,底层也是CAS操作实现,通过下标结合操作元素。

Striped64和LongAdder

JDK 8,针对Long型的原子操作,java新增了LongAdder、LongAccumulator类;针对Double型的原子操作,新增了DoubleAdder、DoubleAccumulator类,他们都是属于Striped64的继承类。

img

LongAdder原理

AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作,在高并发情况下就不够高效了,因此LongAdder创造了另外一种设计,把一个Long变量拆分成一个base和多个cell,都是对Long型变量做的包装,类似于ConcurrentHashMap的分段锁,在多线程并发下,如果并发低,就使用主base,如果并发高,就分摊到节点cell上,最后取值的时候累加,类似于负载均衡原理。

img

基础类Striped64,也可以称为分片64位的类,无论是long,还是double,都是64位的。但因为没有double型的CAS操作,所以是通过把double型转化成long型来实现的。

最终一致性

在sum求和方法中,并没有对cells[]数组加锁。也就是说,一边有线程对其执行求和操作,一边还有线程修改数组里的值,也就是最终一致性,而不是强一致性。这也类似于ConcurrentHashMap 中的clear()方法,一边执行清空操作,一边还有线程放入数据,clear()方法调用完毕后再读取,hash map里面可能还有元素。因此,在LongAdder适合高并发的统计场景,而不适合要对某个 Long 型变量进行严格同步的场景。

@jdk.internal.vm.annotation.Contended注解

JDK8之后新增,他的作用是做优化,称之为伪共享与缓存行填充。

存在问题:如果在以前,主内存中有A\B\C三个Long型变量,他被CPU1和CPU2分别读入了自己的缓存中,放在了同一行cache中,这时候CPU1要修改A的值,那么他就要失效整行cache,并通知CPU2也要整行cache失效,这时候B\C就遭了无妄之灾,他们俩也被失效咯,本来B\C不应该被失效,应该是可以继续读取共享的,现在不行了,所以被称为伪共享问题。

img

解决办法:声明一个@jdk.internal.vm.annotation.Contended即可实现缓存行的填充。之所以这个地方要用缓存行填充,是为了不让Cell[]数组中相邻的元素落到同一个缓存行里。

img

LongAccumulator类是LongAdder类的增强,他们的原理是相同的,LongAdder只能进行累加操作,并且初始值默认为0;LongAccumulator可以自己定义一个二元操作符,并且可以传入一个初始值。而double的那两个类只不过是Long类的转换罢了。

五、JUC之Lock与Condition

互斥锁(ReentrantLock)

可重入锁:指当一个线程调用 object.lock()获取到锁,进入临界区后,再次调用object.lock(),仍然可以获取到该锁。显然,通常的锁都要设计成可重入的,否则就会发生死锁,比如synchronized关键字。

img

他继承Lock接口,Lock中常用的方法就是lock()获取锁\unlock()释放锁,继承Lock的这两个方法但是本身没有逻辑,他的实现都在内部类sync中。

img

公平锁和非公平锁

Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁,通过传入一个布尔变量来确定,公平锁按顺序排队,非公平锁无论先后都可以抢,为了提高效率,大部分锁类默认都是非公平锁。

锁的基本实现原理

锁必须具备的四个核心要素:

1)需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,使用CAS保证线程安全。

2)需要记录当前是哪个线程持有锁。

3)需要底层支持对一个线程进行阻塞或唤醒操作。

4.)需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用CAS。

Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),他就是锁功能实现的关键。他的父类AOS记录持有锁的线程,他自身具有state变量记录锁的状态,并且state是累加的,0无锁、1持有锁、>1重入锁,满足1,2要素;接着通过LockSupport工具类对Unsafe类的park方法阻塞和unpark方法唤醒做了封装,引入到AQS类中,满足3要素。最后在AQS类里面基于双向链表和CAS实现了一个阻塞队列。head指向双向链表头部,tail指向双向链表尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把head向后移一个位置,如果head和tail相等,也就是都为null说明队列为空,满足4要素,达到具备一个锁的功能。

AQS阻塞队列与唤醒机制

通过addWaiter(…)方法生成一个node节点把Thread对象加入阻塞队列,线程一旦进入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(…)返回,返回值是一个布尔变量。在该方法返回的一刻,就是拿到锁的那一刻,进行唤醒,唤醒也分两种情况,是否是中断唤醒。如果是中断唤醒acquireQueued会死循环直到拿到锁为止。

Lock类常用方法解析

lock():获取锁,阻塞。

unlock():释放锁,唤醒。

lockInterruptibly():Lock类中的lock方法调用后不能被中断,但是lockInterruptibly方法可以被中断,当有人给他发送中断信号它就抛出InterruptedException异常跳出循环,不再阻塞。

tryLock():基于调用非公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,不阻塞。

读写锁(ReentrantReadWriteLock)

读写锁就是读线程和读线程之间不互斥,它分为读锁和写锁,也称为共享锁和排它锁。

这个类实现了ReadWriteLock接口,顶级接口仍然还是Lock,ReentrantReadWriteLock通过实现ReadWriteLock的writeLock方法和readLock方法进行读写操作,也就是说,当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。

img

读写锁实现的基本原理

ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已,线程分成两类:读线程和写线程。读线程和写线程之间不互斥(可以同时拿到这把锁),读线程之间不互斥,写线程之间互斥。同互斥锁一样,读写锁也是用state变量来表示锁状态的。只是state变量在这里的含义和互斥锁完全不同。在内部类Sync中,对state变量进行了重新定义,因为在读写锁里面,他有两种状态,读锁状态和写锁状态,而无法用一次CAS 同时操作两个int变量,因此这个state就被拆成了两半,用了一个int型的高16位和低16位分别表示读锁和写锁的状态。当state=0时,说明没有线程持有读锁,也没有线程持有写锁;当state != 0时,要么有线程持有读锁,要么有线程持有写锁,两者不能同时成立,因为读和写互斥。这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁,所以读写锁是读读不互斥,读写互斥,写写互斥。

AQS的两对模板方法

acquire/release、acquireShared/releaseShared 是AQS里面的两对模板方法。互斥锁和读写锁的写锁都是基于acquire/release模板方法来实现的。读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现的。

读写锁也有公平和分公平之分

公平,不论是读锁,还是写锁,只要队列中有其他线程在排队(排队等读锁,或者排队等写锁),就不能直接去抢锁,要排在队列尾部。

非公平(读锁):读锁是共享锁,多个线程会同时持有读锁,所以读线程就不是直接抢了,有一个约束,如果队列的第1个元素是写线程的时候,读线程也要阻塞,不能直接去抢。即偏向写线程。因为如果当前读锁被一个读线程持有,其他读线程又在抢,那么可能写锁永远都没机会拿到锁了,无法写入。

非公平(写锁):写锁是排他锁,写线程能抢锁,前提是state=0,只有在没有其他线程持有读锁或写锁的情况下,它才有机会去抢锁。或者state != 0,但那个持有写锁的线程是它自己,再次重入。

Condition

常用于同步锁,Condition本身是一个接口,其功能和wait/notify类似。wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。

img

img

Condition应用场景及实现原理

比如前面讲到了ArrayBlockingQueue阻塞队列,他有一把锁和两个condition条件,即非空条件和非满条件,而这个condition条件就是通过构造器先创建一把锁,再通过这个锁创造出来两个条件。

img

Condition避免了wait/notify的生产者通知生产者、消费者通知消费者的问题。

读写锁中的 ReadLock 是不支持 Condition 的,读写锁的写锁和互斥锁都支持Condition。

await()实现分析

每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表

组成的队列,通过这个Condition调用await方法先把线程加到Condition的等待队列中,在调用await方法时已经是拿到锁的线程,所以在把线程加到Condition等待队列时不需要CAS操作,接着释放掉锁,进入阻塞状态,如果不释放锁会造成死锁,并且他是可以响应中断的,响应中断抛出异常,中断阻塞,如果被中断唤醒就必须要重新拿锁。awaitUninterruptibly方法不会响应中断。

notify()实现分析

持有锁的线程调用signal方法才有资格进行唤醒,没有锁的线程直接抛异常,通过通知唤醒队列中第一个线程。

读写锁(StampedLock)

JDK8中新增了StampedLock,他也是一个读写锁,只不过他的并发量更高,因为他在上一代的设计上做了改动,即读写不互斥。

img

ReentrantReadWriteLock采用的是“悲观读”的策略,就算有非公平约束,也可能导致读线程一直持有锁,写线程拿不到锁无法写入的情况。而StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。

官方使用实例剖析:

有一个Point类,多个线程调用move()方法,修改坐标;还有多个线程调用distanceFromOrigin()方法,求距离。首先,执行move操作的时候,要加写锁,因为写操作和写操作是互斥的,接着调用distenceFromOrigin方法,这个方法就是读的逻辑,他首先默认使用乐观读,不加锁,然后读之前对数据做了一个快照,拷贝存储内存中,读完之后会再比较一下版本号,发现如果有其他线程在读期间修改了数据,就把读的数据废弃,重新读,但是这次读就不是乐观读了,他升级成了悲观读,进行加锁,重新读取数据。

乐观读实现原理

StampedLock是一个读写锁,和前面的读写锁一样,需要一个state表示读锁和写锁,不仅这样,他还要有一个版本号需要记录,因此对于这个32位的int类型state划分又不一样了,第8位表示写锁,写锁只需要一位,因为写锁是不可重入锁,前7位表示读锁,然后里面还有一个初始值state,且不是为0的,通过state&WBIT != 0来确定是否被修改过,一旦线程被写锁持有,或者释放了,state都不会是0了,也就是版本号的变动记录,因此只通过一个变量,既实现了读锁、写锁的状态记录,还实现了数据的版本号的记录。

悲观读实现原理

同ReadWriteLock一样,StampedLock也要进行悲观的读锁和写锁操作。不过,它不是基于AQS实现的,而是内部重新实现了一个阻塞队列。在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列,并且进入阻塞状态。但在StampedLock中,CAS state失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。根据CPU的核数,定义了自旋次数的常量值。如果是单核的CPU,肯定不能自旋,在多核情况下,才采用自旋策略。

六、线程池

线程池的实现原理及继承体系

线程池就是调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务。

线程池实现的基本问题:

1) 队列长度,无限度导致内存耗尽,有限满了之后解决方案。

2) 线程池线程个数,固定还是动态。

3) 新提交任务如何处理,新开线程还是放入队列。

4) 没有任务休眠还是阻塞,阻塞就需要有唤醒机制。

如果是休眠一段时间,就需要通过轮询反复检查是否有新任务到来,而通过使用阻塞队列即可解决问题4也可以解决问题1,所以ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列实现的。

img

Executer类是一个顶级接口,他有两个核心实现类: ThreadPoolExector 和 ScheduledThreadPoolExecutor,这两个类就是实现线程池的,他们都具有执行某个任务的功能,并且ScheduledThreadPoolExecutor还可以周期性执行。

img

ThreadPoolExector

核心数据结构

记录线程池的状态和线程池里线程的个数的原子变量类AtomicInteger

存放任务的阻塞队列BlockingQueue

线程池内部各种变量进行互斥访问控制的重入锁ReentrantLock

放worker(也就是封装线程的一个内部类,一个线程对应一个worker)的hashset集合,而且Worker继承于AQS,也就是说Worker本身就是一把锁。

img

核心配置参数详解

img

1)corePoolSize:在线程池中始终维护的线程个数。

2)maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。

3)keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。

4)blockingQueue:线程池所用的队列类型。

5)threadFactory:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory() 。

6)RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。

配置参数在任务中的提交过程:

步骤一:判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入步骤二。

步骤二:判断队列是否已满。如未满,则放入;如已满,则进入步骤三。

步骤三:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入步骤四。

步骤四:根据拒绝策略,拒绝任务。

总结:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。

线程池的优雅关闭

线程池的关闭也线程不同,因为一个线程池里面有多个线程在运行,因此进行关闭就需要一个策略,进行安全关闭线程池,也就是线程池的生命周期。

JDK7中有一个ctl变量,他组合了线程数量(workerCount)和线程池状态(runState),最高的3位存储线程池状态,其余29位存储线程个数,在之前是分开储存的。

ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。

img

由上图源码得知线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED,这五种状态是一个迁移的过程,从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。

通过shutdown()和shutdownNow()进行关闭,但是他不是真正的关闭,只是切换了状态,只有最后调用了钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。

img

线程池关闭的实现

先调用shutdown()或shutdownNow()方法,接着设定一个轮询,隔一段时间进行检查,通过调用awaitTermination()方法返回布尔变量来确定线程池是否关闭,而awaitTermination的内部实现其实就是不断循环判断线程池是否到达了最终状态TERMINATED,未达到就阻塞一段时间又来,直到获取自己的预期结果为止。

img

shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列,也就是活没干完也不干了。

shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程,到了时间直接强制中断,但是要注意,中断只能中断轻量级锁,也就是阻塞等待,执行代码逻辑中并不能中断。shutdown() 和shutdownNow()都调用了tryTerminate()方法,tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated()。

任务的提交过程

如果当前线程数小于corePoolSize,则启动新线程,添加worker,调用开启新线程方法addworker。如果大于或等于corePoolSize,则调用workQueue.offer放入队列,如果发现线程池正在停止,启动拒绝策略,不再接受这个任务否则就放入队列,放入后没有线程执行,就调用开启新线程方法addworker。如果线程数大于maxPoolSize,并且队列已满,调用拒绝策略。

任务的执行过程

上面的任务提交可能会开启一个worker,也就是调用addworker方法。而且他封装了线程,这个方法不单是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程,它内部有一个runworker方法,先拿到当前线程,再拿到第一个任务,接着中断封装的线程,也就是唤醒,线程就可以执行任务咯,执行完毕释放锁,当前worker就可以继续干其他事了。

shutdown()与任务执行过程综合分析

  1. 当调用shutdown()的时候,所有线程都处于空闲状态

这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都 会退出while循环,之后执行processWorkerExit。

  1. 当调用shutdown()的时候,所有线程都处于忙碌状态

此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环。

  1. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲

有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的 这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。

shutdownNow() 与任务执行过程综合分析

和上面的 shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。

线程池的4种拒绝策略

RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy。

策略1(CallerRunsPolicy):调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧。

策略2(AbortPolicy):线程池抛异常。

策略3(DiscardPolicy):线程池直接丢掉任务,神不知鬼不觉。

策略4(DiscardOldestPolicy):删除队列中最早的任务,将当前任务入队列。

Executors工具类

concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池。

分五种:单线程的线程池、固定数目线程的线程池、每接收一个请求,就创建一个线程来执行、单线程具有周期调度功能的线程池、多线程,有调度功能的线程池。

《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,以规避因使用不当而造成资源耗尽的风险。

ScheduledThreadPoolExecutor

他实现了按时间调度来执行任务,分延迟执行任务和周期执行任务

延迟执行任务

img

周期执行任务

img

AtFixedRate:按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。

WithFixedDelay:按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。

延迟执行和周期性执行的原理

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,内部结构基本一致,内部实现了一个特定的DelayQueue,称之为DelayWorkQueue来实现延迟执行任务。而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。

延迟执行

就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中,通过实现Delayed接口完成延迟执行。

周期性执行

包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue。不过他有两个方法,一个传入负数一个传入正数在内部实现是按间隔还是按频率的一种算法,也就是上面说的AtFixedRate和WithFixedDelay。

img

------ 本文结束感谢您的阅读 ------
请我一杯咖啡吧!
itingyu 微信打赏 微信打赏