一.相关定义:
线程安全类:当多个线程访问某个类时,不管运行环境采用何种调度方式或者这些进程如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
线程安全性主要体现在三个方面:原子性、可见性、有序性。
1.原子性(Atomicity)
原子性是指一个原子操作在cpu中不可以暂停然后再调度,既不被中断操作,要不执行完成,要不就不执行。原子操作保证了原子性问题。
x++(包含三个原子操作)a.将变量x 值取出放在寄存器中 b.将将寄存器中的值+1 c.将寄存器中的值赋值给x
由Java内存模型来直接保证的原子性变量操作包括read、load、use、assign、store和write六个,大致可以认为基础
数据类型的访问和读写是具备原子性的。如果应用场景需要一个更大范围的原子性保证,Java内存模型还提供了lock和
unlock操作来满足这种需求,尽管虚拟机未把lock与unlock操作直接开放给用户使用,但是却提供了更高层次的字节码指
令monitorenter和monitorexit来隐匿地使用这两个操作,这两个字节码指令反映到Java代码中就是同步块——synchronized关键字,因此在synchronized块之间的操作也具备原子性。
2、可见性(Visibility)
java 内存模型的主内存和工作内存,解决了可见性问题。
volatile赋予了变量可见——禁止编译器对成员变量进行优化,它修饰的成员变量在每次被线程访问时,都强迫从内存中重读该成员变量的值;而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存,这样在任何时刻两个不同线程总是看到某一成员变量的同一个值,这就是保证了可见性。
可见性就是指当一个线程修改了线程共享变量的值,其它线程能够立即得知这个修改。无论是普通变量还是volatile变
量都是如此,普通变量与volatile变量的区别是volatile的特殊规则保证了新值能立即同步到主内存,以及每使用前立即从
内存刷新。因为我们可以说volatile保证了线程操作时变量的可见性,而普通变量则不能保证这一点。
3、有序性(Ordering)
Java内存模型中的程序天然有序性可以总结为一句话:如果在本线程内观察,所有操作都是有序的;如果在一个线程中观察另一个线程,所有操作都是无序的。前半句是指“线程内表现为串行语义”,后半句是指“指令重排序”现象和“工作内存主主内存同步延迟”现象。
重排序:代码书写的顺序与实际执行的顺序不同,指令重排序是编译器或处理器为了提高程序性能而做的优化
1.编译器优化的重排序(编译器优化)
2.指令级并行重排序(处理器优化)
3.内存系统的重排序(处理器优化)
二.JAVA对原子性、可见性、有序性的处理
2.1 synchronized
synchronized关键字保证了原子性和可见性,依赖JVM。
原子性:
synchronized同步语句块中的代码在同一时刻只有一个线程能够执行。
可见性:
JMM关于synchronized的两条规定:
1.线程解锁前,必须把共享变量的最新值刷到主内存。
2.线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值。
作用范围:
1.修饰代码块:大括号括起来的代码,作用于调用的对象
2.修饰方法:整个方法,作用于调用的对象3.修饰静态方法:整个静态方法,作用于所有对象
4.修饰类:括号括起来的部分,作用于所有对象
1.synchronized 修饰一个代码块
被修饰的代码称为同步语句块,作用的范围是大括号括起来的部分。作用的对象是调用这段代码的对象。
验证:
public classSynchronizedExample {
public void test(intj){
synchronized (this){
for (int i = 0; i < 10; i++) {
log.info("test - {} - {}",j,i);
}
}
}
//使用线程池方法进行测试:
public static voidmain(String[] args) {
SynchronizedExample example1 = newSynchronizedExample();
SynchronizedExample example2 = newSynchronizedExample();
ExecutorService executorService =Executors.newCachedThreadPool();
executorService.execute(()-> example1.test(1));
executorService.execute(()-> example2.test(2));
}
}
结果:不同对象之间的操作互不影响
2.synchronized 修饰一个方法
被修饰的方法称为同步方法,作用的范围是大括号括起来的部分,作用的对象是调用这段代码的对象。
验证:
public classSynchronizedExample
public synchronized void test(intj){
for (int i = 0; i < 10; i++) {
log.info("test - {} - {}",j,i);
}
}
//验证方法与上面相同
...
}
结果:不同对象之间的操作互不影响
TIPS:
如果当前类是一个父类,子类调用父类的被synchronized修饰的方法,不会携带synchronized属性,因为synchronized不属于方法声明的一部分
3.synchronized 修饰一个静态方法
作用的范围是synchronized 大括号括起来的部分,作用的对象是这个类的所有对象。
验证:
public classSynchronizedExample{
public static synchronized void test(intj){
for (int i = 0; i < 10; i++) {
log.info("test - {} - {}",j,i);
}
}
//验证方法与上面相同
...
}
结果:同一时间只有一个线程可以执行
4.synchronized 修饰一个类
验证:
public class SynchronizedExample{
public static void test(int j){
synchronized (SynchronizedExample.class){
for (int i = 0; i < 10; i++) {
log.info("test - {}-{}",j,i);
}
}
结果:同一时间只有一个线程可以执行
2.2volatile
volatile关键字保证了可见性和有序性,但无原子性。
可见性和有序性:
通过加入内存屏障和禁止重排序优化来实现。
1.对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量刷新到主内存,
2.对valatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量。
volatile不具备原子性验证:
@Slf4j
@NotThreadSafepublic classCountExample4 {//模拟请求总数
public static final int clientTotal = 5000;//同时并发执行的线程数
public static final int threadTotal = 200;//累加结果
public static volatile int count = 0;public static void main(String[] args) throwsException{
ExecutorService executorService=Executors.newCachedThreadPool();//信号量
final Semaphore semaphore = newSemaphore(threadTotal);//计数器减闭锁//当前线程调用await()方法之后进入等待状态,在其他线程中执行countDown()后//计数器减1 至到减到0时 当前线程继续执行
final CountDownLatch countDownLatch = newCountDownLatch(clientTotal);for (int i = 0; i < clientTotal; i++){
executorService.execute(()->{try{
semaphore.acquire();
add();
semaphore.release();
}catch(Exception e){
log.error("exception: e={}",e);
}//放在add()操作之后,以确保每个线程都执行了add操作
countDownLatch.countDown();
});
}
countDownLatch.await();
log.info("count={}",count);
}private static voidadd(){
count++;
}
}
在共享变量count前修饰volatile关键字,保证了变量的可见性,但并不具有原子性,count++操作并不是原子性的,所以不是线程安全的。
volatile关键字应用场景:
要在多线程中安全的使用volatile变量,必须同时满足:
对变量的写入操作不依赖其当前值
不满足:number++、count = count * 5等
满足:boolean变量、记录温度变化的变量等
该变量没有包含在具有其他变量的不变式中
不满足:不变式low < up
反例:
大多数编程情形都会与这两个条件的其中之一冲突,使得 volatile 变量不能像 synchronized 那样普遍适用于实现线程安全。
【反例:volatile变量不能用于约束条件中】 下面是一个非线程安全的数值范围类。它包含了一个不变式 —— 下界总是小于或等于上界。
@NotThreadSafepublic classNumberRange {private intlower, upper;public int getLower() { returnlower; }public int getUpper() { returnupper; }public void setLower(intvalue) {if (value >upper)throw newIllegalArgumentException(...);
lower=value;
}public void setUpper(intvalue) {if (value
upper=value;
}
}
否则,如果凑巧两个线程在同一时间使用不一致的值执行 setLower 和 setUpper 的话,则会使范围处于不一致的状态。例如,如果初始状态是(0, 5),同一时间内,线程 A 调用setLower(4) 并且线程 B 调用setUpper(3),显然这两个操作交叉存入的值是不符合条件的,那么两个线程都会通过用于保护不变式的检查,使得最后的范围值是(4, 3) —— 一个无效值。
正例:
模式 #1:状态标志
也许实现 volatile 变量的规范使用仅仅是使用一个布尔状态标志,用于指示发生了一个重要的一次性事件,例如完成初始化或请求停机。
volatile booleanshutdownRequested;
...public voidshutdown() {
shutdownRequested= true;
}public voiddoWork() {while (!shutdownRequested) {//do stuff
}
}
线程1执行doWork()的过程中,可能有另外的线程2调用了shutdown,所以boolean变量必须是volatile。
而如果使用 synchronized 块编写循环要比使用 volatile 状态标志编写麻烦很多。由于 volatile 简化了编码,并且状态标志并不依赖于程序内任何其他状态,因此此处非常适合使用 volatile。
模式 #2:开销较低的“读-写锁”策略
如果读操作远远超过写操作,您可以结合使用内部锁和 volatile 变量来减少公共代码路径的开销。
如下显示的线程安全的计数器,写操作使用 synchronized 确保增量操作是原子和可见,并使用 volatile 保证读操作当前结果的可见性。如果更新不频繁的话,该方法可实现更好的性能,因为读路径的开销仅仅涉及 volatile 读操作,这通常要优于一个无竞争的锁获取的开销。
@ThreadSafepublic classCheesyCounter {//Employs the cheap read-write lock trick//All mutative operations MUST be done with the 'this' lock held
@GuardedBy("this") private volatile intvalue;//读操作,没有synchronized,提高性能
public intgetValue() {returnvalue;
}//写操作,必须synchronized。因为x++不是原子操作
public synchronized intincrement() {return value++;
}
使用锁进行所有变化的操作,使用 volatile 进行只读操作。
其中,锁一次只允许一个线程访问值,volatile 允许多个线程执行读操作。
模式 #3:“volatile bean” 模式
volatile bean 模式的基本原理是:很多框架为易变数据的持有者(例如 HttpSession)提供了容器,但是放入这些容器中的对象必须是线程安全的。
在 volatile bean 模式中,JavaBean 的所有数据成员都是 volatile 类型的,并且 getter 和 setter 方法必须非常普通——即不包含约束!
@ThreadSafepublic classPerson {private volatileString firstName;private volatileString lastName;private volatile intage;public String getFirstName() { returnfirstName; }public String getLastName() { returnlastName; }public int getAge() { returnage; }public voidsetFirstName(String firstName) {this.firstName =firstName;
}public voidsetLastName(String lastName) {this.lastName =lastName;
}public void setAge(intage) {this.age =age;
}
}
模式 #4:一次性安全发布(one-time safe publication)
在缺乏同步的情况下,可能会遇到某个对象引用的更新值(由另一个线程写入)和该对象状态的旧值同时存在。
这就是造成著名的双重检查锁定(double-checked-locking)问题的根源,其中对象引用在没有同步的情况下进行读操作,产生的问题是您可能会看到一个更新的引用,但是仍然会通过该引用看到不完全构造的对象。【单例模式(以及多线程、无序写入、volatile对单例的影响】
//注意volatile!!!!!!!!!!!!!!!!!
private volatile staticSingleton instace;public staticSingleton getInstance(){//第一次null检查
if(instance == null){synchronized(Singleton.class) { //1//第二次null检查
if(instance == null){ //2
instance = new Singleton();//3
}
}
}return instance;
如果不用volatile,则因为内存模型允许所谓的“无序写入”,可能导致失败。——某个线程可能会获得一个未完全初始化的实例。
模式 #5:独立观察(independent observation)
安全使用 volatile 的另一种简单模式是:定期 “发布” 观察结果供程序内部使用。【例如】假设有一种环境传感器能够感觉环境温度。一个后台线程可能会每隔几秒读取一次该传感器,并更新包含当前文档的 volatile 变量。然后,其他线程可以读取这个变量,从而随时能够看到最新的温度值。
使用该模式的另一种应用程序就是收集程序的统计信息。【例】如下代码展示了身份验证机制如何记忆最近一次登录的用户的名字。将反复使用lastUser 引用来发布值,以供程序的其他部分使用。
public classUserManager {public volatile String lastUser; //发布的信息
public booleanauthenticate(String user, String password) {boolean valid =passwordIsValid(user, password);if(valid) {
User u= newUser();
activeUsers.add(u);
lastUser=user;
}returnvalid;
}
}
2.3锁Lock
显示锁保证了原子性和可见性。
可见性:
例ReentrantLock的同步其实是委托给AbstractQueuedSynchronizer的。加锁和解锁是通过改变AbstractQueuedSynchronizer的state属性,这个属性是volatile的,volatile为了保证可见性, 会在机器指令中加入lock指令, lock强制把缓存(工作内存)写回内存(主内存), 并失效其它线程的缓存行(MESI). 这里要注意的是, lock并不仅仅只把被volatile修饰的变量写回主内存, 而是把工作内存中的变更都写入主内存。
2.4Atomic包相关类的介绍
Atomic包中的类与CAS:
1.AtomicInteger
public classCountExample {//请求总数
public static int clientTotal = 5000;//同时并发执行的线程数
public static int threadTotal = 200;//变量声明:计数
public static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throwsInterruptedException {
ExecutorService executorService= Executors.newCachedThreadPool();//创建线程池
final Semaphore semaphore = new Semaphore(threadTotal);//定义信号量,给出允许并发的数目
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);//定义计数器闭锁
for (int i = 0;i
executorService.execute(()->{try{
semaphore.acquire();//判断进程是否允许被执行
add();
semaphore.release();//释放进程
} catch(InterruptedException e) {
log.error("excption",e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();//保证信号量减为0
executorService.shutdown();//关闭线程池
log.info("count:{}",count.get());//变量取值
}private static voidadd(){
count.incrementAndGet();//变量操作
}
}
上边的示例代码就是通过AtomicInteger类保证了线程的原子性。
那么它是如何保证原子性的呢?我们接下来分析一下它的源码。示例中,对count变量的+1操作,采用的是incrementAndGet方法,此方法的源码中调用了一个名为unsafe.getAndAddInt的方法
public final intincrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
而getAndAddInt方法的具体实现为:
public final int getAndAddInt(Object var1, long var2, intvar4) {intvar5;do{
var5= this.getIntVolatile(var1, var2);
}while(!pareAndSwapInt(var1, var2, var5, var5 +var4));returnvar5;
}
在此方法中,方法参数为要操作的对象Object var1、期望底层当前的数值为var2、要修改的数值var4。定义的var5为真正从底层取出来的值。采用do..while循环的方式去获取底层数值并与期望值进行比较,比较成功才将值进行修改。而这个比较再进行修改的方法就是compareAndSwapInt就是我们所说的CAS,它是一系列的接口,比如下面罗列的几个接口。使用native修饰,是底层的方法。CAS取的是compareAndSwap三个单词的首字母。
public final native boolean compareAndSwapObject(Object var1, longvar2, Object var4, Object var5);public final native boolean compareAndSwapInt(Object var1, long var2, int var4, intvar5);public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
2.AtomicLong 与 LongAdder区别
LongAdder是java8为我们提供的新的类,跟AtomicLong有相同的效果。首先看一下代码实现:
AtomicLong:
//变量声明
public static AtomicLong count = new AtomicLong(0);
//变量操作
count.incrementAndGet();
//变量取值
count.get();
LongAdder://变量声明
public static LongAdder count = newLongAdder();//变量操作
count.increment();//变量取值
count
那么问题来了,为什么有了AtomicLong还要新增一个LongAdder呢?
原因是:CAS底层实现是在一个死循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈的时候,修改成功率很高,否则失败率很高。在失败的时候,这些重复的原子性操作会耗费性能。
知识点: 对于普通类型的long、double变量,JVM允许将64位的读操作或写操作拆成两个32位的操作。
LongAdder类的实现核心是将热点数据分离,比如说它可以将AtomicLong内部的内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行计数,而最终的计数结果则为这个数组的求和累加,其中热点数据value会被分离成多个单元的cell,每个cell独自维护内部的值。当前对象的实际值由所有的cell累计合成,这样热点就进行了有效地分离,并提高了并行度。这相当于将AtomicLong的单点的更新压力分担到各个节点上。在低并发的时候通过对base的直接更新,可以保障和AtomicLong的性能基本一致。而在高并发的时候通过分散提高了性能。
public voidincrement() {
add(1L);
}public void add(longx) {
Cell[] as;long b, v; intm; Cell a;if ((as = cells) != null || !casBase(b = base, b +x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a= as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v +x)))
longAccumulate(x,null, uncontended);
}
缺点:如果在统计的时候,如果有并发更新,可能会有统计数据有误差。实际使用中在处理高并发计数的时候优先使用LongAdder,而不是AtomicLong在线程竞争很低的时候,使用AtomicLong会简单效率更高一些。比如序列号生成(准确性)
3.AtomicBoolean
这个类中值得一提的是它包含了一个名为compareAndSet的方法,这个方法可以做到的是控制一个boolean变量在一件事情执行之前为false,事情执行之后变为true。或者也可以理解为可以控制某一件事只让一个线程执行,并仅能执行一次。
他的源码如下:
public final boolean compareAndSet(boolean expect, booleanupdate) {int e = expect ? 1 : 0;int u = update ? 1 : 0;return pareAndSwapInt(this, valueOffset, e, u);
}
举例说明:
//是否发生过
private static AtomicBoolean isHappened = new AtomicBoolean(false);//请求总数
public static int clientTotal = 5000;//同时并发执行的线程数
public static int threadTotal = 200;public static void main(String[] args) throwsException {
ExecutorService executorService=Executors.newCachedThreadPool();final Semaphore semaphore = newSemaphore(threadTotal);final CountDownLatch countDownLatch = newCountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {
executorService.execute(()->{try{
semaphore.acquire();
test();
semaphore.release();
}catch(Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappened:{}", isHappened.get());
}private static voidtest() {if (pareAndSet(false, true)) {//控制某有一段代码只执行一次
log.info("execute");
}
}
结果:(log只打印一次)
[pool-1-thread-2] INFO com.superboys.concurrency.example.Atomic.AtomicExample6 -execute
[main] INFO com.superboys.concurrency.example.Atomic.AtomicExample6- isHappened:true
4.AtomicIntegerFieldUpdater
这个类的核心作用是要更新一个指定的类的某一个字段的值。并且这个字段一定要用volatile修饰同时还不能是static的。
举例说明:
@Slf4jpublic classAtomicExample5 {//原子性更新某一个类的一个实例
private static AtomicIntegerFieldUpdaterupdater= AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");
@Getterpublic volatile int count = 100;//必须要volatile标记,且不能是static
public static voidmain(String[] args) {
AtomicExample5 example5= newAtomicExample5();if(pareAndSet(example5,100,120)){
log.info("update success 1,{}",example5.getCount());
}if(pareAndSet(example5,100,120)){
log.info("update success 2,{}",example5.getCount());
}else{
log.info("update failed,{}",example5.getCount());
}
}
}
此方法输出的结果为:
[main] INFO com.superboys.concurrency.example.Atomic.AtomicExample5 - update success 1,120[main] INFO com.superboys.concurrency.example.Atomic.AtomicExample5- update failed,120
由此可见,count的值只修改了一次。
5.AtomicStampReference与CAS的ABA问题
什么是ABA问题?
CAS操作的时候,其他线程将变量的值A改成了B,但是随后又改成了A,本线程在CAS方法中使用期望值A与当前变量进行比较的时候,发现变量的值未发生改变,于是CAS就将变量的值进行了交换操作。但是实际上变量的值已经被其他的变量改变过,这与设计思想是不符合的。所以就有了AtomicStampReference。
源码:
private static class Pair{finalT reference;final intstamp;private Pair(T reference, intstamp) {this.reference =reference;this.stamp =stamp;
}static Pair of(T reference, intstamp) {return new Pair(reference, stamp);
}
}private volatile Pairpair;private boolean casPair(Pair cmp, Pairval) {return pareAndSwapObject(this, pairOffset, cmp, val);
}public booleancompareAndSet(V expectedReference,
V newReference,intexpectedStamp,intnewStamp) {
Pair current =pair;returnexpectedReference== current.reference &&expectedStamp== current.stamp &&((newReference== current.reference && //排除新的引用和新的版本号与底层的值相同的情况
newStamp == current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));
}
AtomicStampReference的处理思想是,每次变量更新的时候,将变量的版本号+1,之前的ABA问题中,变量经过两次操作以后,变量的版本号就会由1变成3,也就是说只要线程对变量进行过操作,变量的版本号就会发生更改。从而解决了ABA问题。
解释一下上边的源码:
类中维护了一个volatile修饰的Pair类型变量current,Pair是一个私有的静态类,current可以理解为底层数值。
compareAndSet方法的参数部分分别为期望的引用、新的引用、期望的版本号、新的版本号。
return的逻辑为判断了期望的引用和版本号是否与底层的引用和版本号相符,并且排除了新的引用和新的版本号与底层的值相同的情况(即不需要修改)的情况(return代码部分3、4行)。条件成立,执行casPair方法,调用CAS操作。
6.AtomicLongArray
这个类实际上维护了一个Array数组,我们在对数值进行更新的时候,会多一个索引值让我们更新。