Java之JUC学习--part3
Java之JUC学习--part3
“线程同步”相关API
wait-notify
基本使用
需要获取对象锁后才可以调用 锁对象.wait(),notify 随机唤醒一个线程,notifyAll 唤醒所有线程去竞争 CPU
Object 类 API:
public final void notify():唤醒正在等待对象监视器的单个线程。
public final void notifyAll():唤醒正在等待对象监视器的所有线程。
public final void wait():导致当前线程等待,直到另一个线程调用该对象的 notify() 方法或 notifyAll()方法。
public final native void wait(long timeout):有时限的等待, 到n毫秒后结束等待,或是被唤醒
说明:wait 是挂起线程,需要唤醒的都是挂起操作(释放锁),阻塞线程可以自己去争抢锁,挂起的线程需要唤醒后去争抢锁
对比 sleep():
- 原理不同:sleep() 方法是属于 Thread 类,是线程用来控制自身流程的,使此线程暂停执行一段时间而把执行机会让给其他线程;wait() 方法属于 Object 类,用于线程间通信
- 对锁的处理机制不同:调用 sleep() 方法的过程中,线程不会释放对象锁,当调用 wait() 方法的时候,线程会放弃对象锁,进入等待此对象的等待锁定池(不释放锁其他线程怎么抢占到锁执行唤醒操作),但是都会释放 CPU
- 使用区域不同:wait() 方法必须放在**同步控制方法和同步代码块(先获取锁)**中使用,sleep() 方法则可以放在任何地方使用
底层原理:
- Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
- BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
- BLOCKED 线程会在 Owner 线程释放锁时唤醒
- WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,唤醒后并不意味者立刻获得锁,需要进入 EntryList 重新竞争

代码优化
虚假唤醒:notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程
解决方法:采用 notifyAll
notifyAll 仅解决某个线程的唤醒问题,使用 if + wait 判断仅有一次机会,一旦条件不成立,无法重新判断
解决方法:用 while + wait,当条件不成立,再次 wait
@Slf4j(topic = "c.demo")
public class demo {
static final Object room = new Object();
static boolean hasCigarette = false; //有没有烟
static boolean hasTakeout = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {//while防止虚假唤醒
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();
Thread.sleep(1000);
new Thread(() -> {
// 这里能不能加 synchronized (room)?
synchronized (room) {
hasTakeout = true;
//log.debug("烟到了噢!");
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();
}
}
park-unpark
LockSupport 是用来创建锁和其他同步类的线程原语
LockSupport 类方法:
LockSupport.park():暂停当前线程,挂起原语LockSupport.unpark(暂停的线程对象):恢复某个线程的运行
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println("start..."); //1
Thread.sleep(1000);// Thread.sleep(3000)
// 先 park 再 unpark 和先 unpark 再 park 效果一样,都会直接恢复线程的运行
System.out.println("park..."); //2
LockSupport.park();
System.out.println("resume...");//4
},"t1");
t1.start();
Thread.sleep(2000);
System.out.println("unpark..."); //3
LockSupport.unpark(t1);
}
LockSupport 出现就是为了增强 wait & notify 的功能:
- wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park、unpark 不需要
- park & unpark 以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify。类比生产消费,先消费发现有产品就消费,没有就等待;先生产就直接产生商品,然后线程直接消费
- wait 会释放锁资源进入等待队列,park 不会释放锁资源,只负责阻塞当前线程,会释放 CPU
-
原理:类似生产者消费者
-
先 park:
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter ,本情况为 0,这时获得 _mutex 互斥锁
- 线程进入 _cond 条件变量挂起
- 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
- 0唤醒 _cond 条件变量中的 Thread_0,Thread_0 恢复运行,设置 _counter 为 0

-
先 unpark:
- 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter ,本情况为 1,这时线程无需挂起,继续运行,设置 _counter 为 0

-
安全分析
成员变量和静态变量:
- 如果它们没有共享,则线程安全
- 如果它们被共享了,根据它们的状态是否能够改变,分两种情况:
- 如果只有读操作,则线程安全
- 如果有读写操作,则这段代码是临界区,需要考虑线程安全问题
局部变量:
- 局部变量是线程安全的
- 局部变量引用的对象不一定线程安全(逃逸分析):
- 如果该对象没有逃离方法的作用访问,它是线程安全的(每一个方法有一个栈帧)
- 如果该对象逃离方法的作用范围,需要考虑线程安全问题(暴露引用)
常见线程安全类:String、Integer、StringBuffer、Random、Vector、Hashtable、java.util.concurrent 包
-
线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的
-
每个方法是原子的,但多个方法的组合不是原子的,只能保证调用的方法内部安全:
Hashtable table = new Hashtable(); // 线程1,线程2 if(table.get("key") == null) { table.put("key", value); }
无状态类线程安全,就是没有成员变量的类
不可变类线程安全:String、Integer 等都是不可变类,内部的状态不可以改变,所以方法是线程安全
-
replace 等方法底层是新建一个对象,复制过去
Map<String,Object> map = new HashMap<>(); // 线程不安全 String S1 = "..."; // 线程安全 final String S2 = "..."; // 线程安全 Date D1 = new Date(); // 线程不安全 final Date D2 = new Date(); // 线程不安全,final让D2引用的对象不能变,但对象的内容可以变
抽象方法如果有参数,被重写后行为不确定可能造成线程不安全,被称之为外星方法:public abstract foo(Student s);
同步模式
保护性暂停
-
单任务版
Guarded Suspension,用在一个线程等待另一个线程的执行结果
- 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式

public static void main(String[] args) {
GuardedObject object = new GuardedObjectV2();
new Thread(() -> {
sleep(1);
object.complete(Arrays.asList("a", "b", "c"));
}).start();
Object response = object.get(2500);
if (response != null) {
log.debug("get response: [{}] lines", ((List<String>) response).size());
} else {
log.debug("can't get response");
}
}
class GuardedObject {
private Object response;
private final Object lock = new Object();
//获取结果
//timeout :最大等待时间
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
//经历时间超过最大等待时间退出循环
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}
-
多任务版
多任务版保护性暂停:

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, id + "号快递到了").start();
}
}
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信i d:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信id:{},内容:{}", guardedObject.getId(),mail);
}
}
class Postman extends Thread{
private int id;
private String mail;
//构造方法
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("开始送信i d:{},内容:{}", guardedObject.getId(),mail);
guardedObject.complete(mail);
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
//产生唯一的id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
class GuardedObject {
//标识,Guarded Object
private int id;//添加get set方法
}
顺序输出
-
顺序输出2 1
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (true) { //try { Thread.sleep(1000); } catch (InterruptedException e) { } // 当没有许可时,当前线程暂停运行;有许可时,用掉这个许可,当前线程恢复运行 LockSupport.park(); System.out.println("1"); } }); Thread t2 = new Thread(() -> { while (true) { System.out.println("2"); // 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』) LockSupport.unpark(t1); try { Thread.sleep(500); } catch (InterruptedException e) { } } }); t1.start(); t2.start(); } -
交替输出
public class day2_14 { public static void main(String[] args) throws InterruptedException { AwaitSignal awaitSignal = new AwaitSignal(5); Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); new Thread(() -> { awaitSignal.print("a", a, b); }).start(); new Thread(() -> { awaitSignal.print("b", b, c); }).start(); new Thread(() -> { awaitSignal.print("c", c, a); }).start(); Thread.sleep(1000); awaitSignal.lock(); try { a.signal(); } finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock { private int loopNumber; public AwaitSignal(int loopNumber) { this.loopNumber = loopNumber; } //参数1:打印内容 参数二:条件变量 参数二:唤醒下一个 public void print(String str, Condition condition, Condition next) { for (int i = 0; i < loopNumber; i++) { lock(); try { condition.await(); System.out.print(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } }