B站黑马课程
只使用一把锁时,锁住整个对象
class BigRoom {public void sleep() {synchronized (this) {log.debug("sleeping 2 小时");Sleeper.sleep(2);}}public void study() {synchronized (this) {log.debug("study 1 小时");Sleeper.sleep(1);}}
}
可以设置多把细粒度锁
,提高并发度
,潜在风险是死锁
class BigRoom {private final Object studyRoom = new Object();private final Object bedRoom = new Object();public void sleep() {(bedRoom) {log.debug("sleeping 2 小时");Sleeper.sleep(2);}}public void study() {synchronized (studyRoom) {log.debug("study 1 小时");Sleeper.sleep(1);}}
}
示例
t1 线程获得 A对象 锁,接下来想获取 B对象的锁
t2 线程获得 B对象 锁,接下来想获取 A对象的锁
定位死锁
检测死锁可以使用 jconsole工具
,或者使用 jps
定位进程 id,再用 jstack
定位死锁
jps #查看java线程id
jstack 34628 #查看指定id的线程
最终观察其中有如下内容
"t2" #23 prio=5 os_prio=0 tid=0x0000016d4ec3b000 nid=0x5bb0 waiting for monitor entry [0x0000000decffe000]java.lang.Thread.State: BLOCKED (on object monitor)at com.example.ConcurrentApplication.lambda$main$1(ConcurrentApplication.java:33)- waiting to lock <0x000000076ff202f0> (a java.lang.Object)- locked <0x000000076ff20300> (a java.lang.Object)at com.example.ConcurrentApplication$$Lambda$4/1590550415.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)"t1" #22 prio=5 os_prio=0 tid=0x0000016d4ec3a000 nid=0x87a4 waiting for monitor entry [0x0000000deceff000]java.lang.Thread.State: BLOCKED (on object monitor)at com.example.ConcurrentApplication.lambda$main$0(ConcurrentApplication.java:23)- waiting to lock <0x000000076ff20300> (a java.lang.Object)- locked <0x000000076ff202f0> (a java.lang.Object)at com.example.ConcurrentApplication$$Lambda$3/392292416.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)
..."t2":at com.example.ConcurrentApplication.lambda$main$0(ConcurrentApplication.java:23)- waiting to lock <0x000000076ff20300> (a java.lang.Object)- locked <0x000000076ff202f0> (a java.lang.Object)at com.example.ConcurrentApplication$$Lambda$3/392292416.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.
解决方法之一:顺序加锁
演示
class Chopstick {String name;public Chopstick(String name) {this.name = name;}@Overridepublic String toString() {return "筷子{" + name + '}';}
}
class Philosopher extends Thread {Chopstick left;Chopstick right;public Philosopher(String name, Chopstick left, Chopstick right) {super(name);this.left = left;this.right = right;}private void eat() {log.debug("eating...");Sleeper.sleep(1);}@Overridepublic void run() {while (true) {// 获得左手筷子synchronized (left) {// 获得右手筷子synchronized (right) {// 吃饭eat();}// 放下右手筷子}// 放下左手筷子}}
}
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束
演示
public class TestLiveLock {static volatile int count = 10;static final Object lock = new Object();public static void main(String[] args) {new Thread(() -> {// 期望减到 0 退出循环while (count > 0) {sleep(0.2);count--;log.debug("count: {}", count);}}, "t1").start();new Thread(() -> {// 期望超过 20 退出循环while (count < 20) {sleep(0.2);count++;log.debug("count: {}", count);}}, "t2").start();}
}
常定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束
源码没看明白,总结一下大概流程
AQS 就是一个抽象类,可用来在多线程环境下构建锁,比如ReentranctLock就是基于它的独占锁。我对它的了解不是非常深入,只能大概说一下。它有两个非常重要的组件,一个表示锁状态的state,在构建锁时可以根据自己的需要定义state的含义,比如ReentrantLock里面就是state=0时表示没有加锁,state=1表示加锁,state>1表示被冲入了。另外一个重要组件就是一个双向队列,用来存储等待锁的线程。当第一个线程来获取锁时,非公平条件下它会尝试通过CAS操作去改变state的值,如果成功说明锁空闲,失败就以CAS操作加入到队列的尾部,等待它的前一个线程结点来唤醒它。
AQS可以实现独享锁和共享锁。比如ReentrantLock就是独占锁,它又可以分为公平锁和非公平锁,公平锁按照队列的顺序获取锁,非公平锁就是当新的线程来到时,它先去尝试获取一下,获取不到再入队。共享锁有countDownLatch之类的,会定义一个初始计数器,表示可共享的个数,具体不是很了解
ReentrantLock主要基于CAS和AQS实现,支持公平锁和非公平锁
ReentrantLock 类内部总共存在Sync
、NonfairSync
、FairSync
三个类,NonfairSync与 FairSync类继承自 Sync类,Sync类继承自 AbstractQueuedSynchronizer (AQS)
抽象类
详见:https://blog.csdn.net/weixin_42039228/article/details/123135122
基本语法
// 获取锁
reentrantLock.lock();
try {// 临界区
} finally {// 释放锁reentrantLock.unlock();
}
相对于 synchronized 它具备如下特点
可中断
private static ReentrantLock lock = new ReentrantLock();
try {//如果没有竞争那么此方法就会获取lock对象锁//如果有竞争就进入阻塞队列,可以被其他线程使用 interrupt 方法打断lock.lockInterruptibly();
} catch (InterruptedException e) {e.printStackTrace();return;
}
可以设置超时时间
tryLock
:在规定时间内获取锁
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] agrs) {Thread t1 = new Thread(()->{log.debug("尝试获取锁");try {//tryLock()无参表示获取一次if(!lock.tryLock(2, TimeUnit.SECONDS)) {//在2秒内尝试获取锁log.debug("获取锁失败");return;}} catch (InterruptedException e) {e.printStackTrace();}try{//临界区log.debug("获取锁成功");}finally{lock.unlock();}}, "t1");lock.lock();t1.start();sleep(1);lock.unlock();
}
可以设置为公平锁
支持多个条件变量 (await / signal)
synchronized 中也有条件变量,当条件不满足时进入 waitSet
等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] agrs) {//创建一个新的条件变量Condition condition1 = lock.newCondition();Condition condition2 = lock.newCondition();lock.lock();//先加锁//进入条件变量condition1中等待condition1.await();//叫醒阻塞在condition1中的线程condition1.signal();
}
使用要点
ReentrantLock 与 synchronized 一样,都支持可重入
使用tryLock()
,先获取左筷子,再获取右筷子。如果右筷子获取失败,会释放左筷子
package com.example;@Slf4j(topic = "c.Test")
public class ConcurrentApplication{public static void main(String[] agrs) {Chopstick c1 = new Chopstick("1");Chopstick c2 = new Chopstick("2");Chopstick c3 = new Chopstick("3");Chopstick c4 = new Chopstick("4");Chopstick c5 = new Chopstick("5");new Philosopher("苏格拉底", c1, c2).start();new Philosopher("柏拉图", c2, c3).start();new Philosopher("亚里士多德", c3, c4).start();new Philosopher("赫拉克利特", c4, c5).start();new Philosopher("阿基米德", c5, c1).start();}
}class Chopstick extends ReentrantLock{String name;public Chopstick(String name){this.name=name;}@Overridepublic String toString() {return "筷子{" + name + '}';}
}@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread{Chopstick left;Chopstick right;public Philosopher(String name, Chopstick left, Chopstick right){super(name);this.left=left; this.right=right;}private void eat(){log.debug("eating ...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void run() {while(true){if(left.tryLock()){try{if(right.tryLock()){eat();right.unlock();}}finally {left.unlock();}}}}
}
需求:要求先运行线程2,再运行线程1
1. wait/notify方案
static final Object lock = new Object();
static boolean t2runned = false;//判断t2是否运行过public static void main(String[] agrs) {Thread t1 = new Thread(() -> {synchronized (lock){while(!t2runned){try{lock.wait();}catch (InterruptedException e){e.printStackTrace();}}log.debug("1");}}, "t1");Thread t2 = new Thread(() -> {synchronized (lock){log.debug("2");t2runned = true;lock.notify();}}, "t2");t1.start();t2.start();
}
2. pack/unpack方案
Thread t1 = new Thread(() -> {LockSupport.park();log.debug("1");
}, "t1");
Thread t2 = new Thread(() -> {log.debug("2");LockSupport.unpark(t1);
}, "t2");
t1.start();
t2.start();
需求:线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
1. wait/notify方案
@Slf4j(topic = "c.Test")
public class ConcurrentApplication {public static void main(String[] agrs) {WaitNotify waitNotify = new WaitNotify(1, 5);new Thread(() -> {waitNotify.print("a", 1, 2);}, "t1").start();new Thread(() -> {waitNotify.print("b", 2, 3);}, "t2").start();new Thread(() -> {waitNotify.print("c", 3, 1);}, "t3").start();}
}class WaitNotify{private int flag;//等待标记,1,2,3表示不同线程private int loopNumber;//循环次数public WaitNotify(int flag, int loopNumber) {this.flag = flag;this.loopNumber = loopNumber;}public void print(String str, int waitFlag, int nextFlag){for(int i=0; isynchronized (this){while (flag != waitFlag){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.print(str);flag = nextFlag;this.notifyAll();}}}
}
2. await/signal方案
@Slf4j(topic = "c.Test")
public class ConcurrentApplication {public static void main(String[] agrs) {AwaitSignal awaitSignal = new AwaitSignal(5);Condition a = awaitSignal.newCondition();Condition b = awaitSignal.newCondition();Condition c = awaitSignal.newCondition();new Thread(() ->{awaitSignal.print("a", a, b);}, "t1").start();new Thread(() ->{awaitSignal.print("b", b, c);}, "t2").start();new Thread(() ->{awaitSignal.print("c", c, a);}, "t3").start();Thread.sleep(1000);awaitSignal.lock();try{System.out.println("开始!");a.signal();}finally {awaitSignal.unlock();}}
}class AwaitSignal extends ReentrantLock{private int loopNumber;public AwaitSignal(int loopNumber){this.loopNumber = loopNumber;}public void print(String str, Condition current, Condition next){for(int i=0; ilock();try{current.await();System.out.print(str);next.signal();}catch (InterruptedException e){e.printStackTrace();}finally {unlock();}}}
}
3. park/unpark方案
@Slf4j(topic = "c.Test")
public class ConcurrentApplication {static Thread t1, t2, t3;public static void main(String[] agrs) {ParkUnpack pu = new ParkUnpack(5);t1 = new Thread(() ->{pu.print("a", t2);}, "t1");t2 = new Thread(() ->{pu.print("b", t3);}, "t2");t3 = new Thread(() ->{pu.print("c", t1);}, "t3");t1.start();t2.start();t3.start();LockSupport.unpark(t1);}
}class ParkUnpack{private int loopNumber;public ParkUnpack(int loopNumber){this.loopNumber = loopNumber;}public void print(String str, Thread next){for (int i=0; iLockSupport.park();System.out.print(str);LockSupport.unpark(next);}}
}
Monitor 主要关注的是访问共享变量时,保证临界区代码的原子性
除此之外,共享变量还有可见性
和有序性
的问题
JMM
(Java Memory Model,Java内存模型) 体现在以下几个方面
重点注意
JVM内存模型
和Java内存模型
是不一样的
static boolean run = true; //添加volatile
public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(run){// ....}});t.start();sleep(1);run = false; // 线程t不会如预想的停下来
}
JVM优化时,会将循环超过1w次的代码作为热点代码
JVM会把热点代码的字节码编译成机器码放到方法区,下次执行时直接执行对应的机器码来提高执行效率
因此这里修改为false无用在while中加入 log.debug(“d”); 即可停止下来
另外:println是synchronized修饰的
可见性的产生原因
解决方案:volatile
volatile static boolean run = true
可以用来修饰成员变量
和静态成员变量
,避免线程从自己的工作缓存中查找变量的值,强制到主存中获取它的值
加锁synchronized
也可以避免可见性的问题
static boolean run = true;
final static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){synchronized (lock){if(!run){break;}}}});t.start();sleep(1);synchronized (lock){run = false;}
}
violate只保证可见性,并不保证原子性
synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性
但缺点是synchronized 是属于重量级操作,性能相对更低
之前的两阶段终止模式是通过 interrupt 实现的
这里使用violate改进
@Slf4j(topic = "c.test")
public class ConcurrentApplication {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();Thread.sleep(3500);tpt.stop();}
}@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {private Thread monitor;private volatile boolean stop = false;public void start(){monitor = new Thread(()->{while(true){Thread current = Thread.currentThread();if(stop){log.debug("料理后事");break;}try {Thread.sleep(1000);//如果在这里sleep被打断,将进入catch里面log.debug("执行监控记录");}catch (InterruptedException e){}}});monitor.start();}public void stop(){stop = true;monitor.interrupt();//使得stop后立即停止}
}
Balking (犹豫)模式
上面监控线程存在的问题:如果创建2个监控线程,那么这两个线程将在同时刻打印监控信息,导致重复
需求:使得监控方法 start() 只执行一次
@Slf4j(topic = "c.test")
public class ConcurrentApplication {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();tpt.start();}
}@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {private Thread monitor;private volatile boolean stop = false;//判断是否执行过 start()private boolean starting = false;public void start(){//犹豫模式:Balkingsynchronized (this){if(starting) return;starting = true;}monitor = new Thread(()->{...});monitor.start();}public void stop(){stop = true;monitor.interrupt();//使得stop后立即停止}
}
常用在web开发中,这样前端即便点击多次start按钮,也能保证仅有一个监控程序
此外还可以用在实现单例模式
public final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;public static synchronized Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}
}
指令重排:JVM 会在不影响正确性的前提下,调整语句的执行顺序
static int i;
static int j;
// 在某个线程内执行如下赋值操作
i = ...;
j = ...;
此时无论是先执行 i 还是先执行 j,对结果没有影响
这种情况下,JVM可能对上面代码的执行顺序进行重排
指令重排的原因和原理
现代处理器会设计为一个时钟周期完成一条执行时间最长的 CPU 指令
每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 这 5 个阶段
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合
来实现指令级并行
多级指令流水线
,例如支持同时执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理器,就可以称之为五级指令流水线指令重排举例
int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {if(ready) {r.r1 = num + num;//r是一个只有r1成员变量的对象} else {r.r1 = 1;}
}
// 线程2 执行此方法
public void actor2(I_Result r) {num = 2;ready = true;
}
问:最终 r1 的结果为多少?这里分析一种奇怪的结果
r1 = 0
线程2中进行指令重排,使得 ready = true 在 num = 2 之前执行,就会导致 r1 = 0
禁止指令重排 - volatile
volatile boolean ready = false;
volatile 的底层实现原理是内存屏障
,Memory Barrier(Memory Fence)
写屏障
(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
读屏障
(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
写屏障
会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
读屏障
会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
以著名的 double-checked locking 单例模式为例
public final class Singleton {private Singleton() { }private static Singleton INSTANCE = null;public static Singleton getInstance() {if(INSTANCE == null) { // t2// 首次访问会同步,而之后的使用没有 synchronizedsynchronized(Singleton.class) {if (INSTANCE == null) { // t1INSTANCE = new Singleton();}}}return INSTANCE;}
}
以上的实现特点是:
问题:第一次 if(INSTANCE == null)
在 synchronized 之外,有指令重排的危险性
原理:
0: getstatic #2 // 获得静态变量INSTANCE
3: ifnonnull 37 // 判断是否null,不为null,跳转到37行
6: ldc #3 // 获取类对象Singleton.class
8: dup // 复制引用地址
9: astore_0 // 将复制的引用地址存入寄存器
10: monitorenter // 进入同步代码块
11: getstatic #2 // 获得静态变量INSTANCE
14: ifnonnull 27 // 判断是否null,不为null,跳转到27行(拿出类对象,用于解锁)
17: new #3 // new Singleton();
20: dup // 复制一份新创建对象的地址
21: invokespecial #4 // 调用构造方法
24: putstatic #2 // 将创建的对象赋值给静态变量
27: aload_0 // 类对象解锁
28: monitorexit
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2
40: areturn
其中
也许 jvm 会优化为:先执行 24,再执行 21,即先赋值再构造。此时可能出现
线程t1执行同步代码块,线程t2在INSTANCE未被构造的情况下获取到了它,然后正常使用。结果是使用了一个未构造的对象,导致报错
synchronized保护的共享变量是可以保障原子性、可见性、有序性的,但是这里的INSTANCE因为有部分在synchronized之外,因此可能出问题
解决方案
private static volatile Singleton INSTANCE = null;
是通过读写屏障阻止了重排序而实现的
参考:https://www.jianshu.com/p/b9186dbebe8e
happens-before原则
如何判断是否为 happens-before?
希望 doInit() 方法仅被调用一次,下面的实现是否有问题,为什么?
public class TestVolatile {volatile boolean initialized = false;void init() {if (initialized) {return;}doInit();initialized = true;}private void doInit() {}
}
有问题:只能保障可见性,不能保障原子性,可以改用synchronized
饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
public final class Singleton implements Serializable {private Singleton() {}private static final Singleton INSTANCE = new Singleton(); public static Singleton getInstance() {return INSTANCE;}public Object readResolve() {return INSTANCE;}
}
问题1:类为什么加 final?
问题2:如果实现了序列化接口(implements Serializable), 还要做什么来防止反序列化破坏单例?
关于序列化和反序列化:https://zhuanlan.zhihu.com/p/340258358
增加 readResovle()
方法
//固定名称的方法,在反序列化中如果发现readResolve()返回了一个对象,就会使用这个对象,而非反序列化后生成的对象
public Object readResolve(){return INSTANCE;
}
问题3:构造方法为什么设置为私有? 是否能防止反射创建新的实例?
问题4:这里INSTANCE的初始化是否能保证单例对象创建时的线程安全?
问题5:为什么提供静态方法getInstance而不是直接将 INSTANCE 设置为 public, 说出你知道的理由
枚举方式实现的单例
enum Singleton {INSTANCE;
}
问题1:枚举单例是如何限制实例个数的
问题2:枚举单例在创建时是否有并发问题
问题3:枚举单例能否被反射破坏单例
问题4:枚举单例能否被反序列化破坏单例
问题5:枚举单例属于懒汉式还是饿汉式
问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做
懒汉式的单例
public final class Singleton {private Singleton() { }private static Singleton INSTANCE = null;public static synchronized Singleton getInstance() {if( INSTANCE != null ){return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}
}
注意 synchronized 不要加在 INSTANCE 上,一个是因为它是null,另外synchronized需要加在不变的对象上,即final
分析这里的线程安全, 并说明有什么缺点
DCL
public final class Singleton {private Singleton() { }private static volatile Singleton INSTANCE = null;public static Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}synchronized (Singleton.class) {if (INSTANCE != null) {return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}}
}
问题1:解释为什么要加 volatile ?
问题2:对比实现3, 实现4的写法的意义
问题3:为什么要第二次加空判断, 之前不是判断过了吗
public final class Singleton {private Singleton() { }private static class LazyHolder {static final Singleton INSTANCE = new Singleton();}public static Singleton getInstance() {return LazyHolder.INSTANCE;}
}
问题:属于懒汉式还是饿汉式
问题:在创建时是否有并发问题