什么是等待通知機制
在單線程編程中,要執行的操作需要滿足一定的條件才能執行,可以把這個操作放在if語句塊中。
在多線程編程中,可能A線程的條件沒有滿足只是暫時的, 稍后其他的線程B可能會更新條件使得A線程的條件得到滿足. 可以將A線程暫停,直到它的條件得到滿足后再將A線程喚醒.它的偽代碼:
atomics{ //原子操作
while( 條件不成立 ){
等待
}
當前線程被喚醒條件滿足后,繼續執行下面的操作
}
等待/通知機制的實現
Object類中的wait()方法可以使執行當前代碼的線程等待,暫停執行,直到接到通知或被中斷為止。
注意:
● wait()方法只能 在同步代碼塊中由鎖對象調用。
● 調用wait()方法,當前線程會釋放鎖。
其偽代碼如下:
//在調用wait()方法前獲得對象的內部鎖
synchronized( 鎖對象 ){
while( 條件不成立 ){
//通過鎖對象調用 wait()方法暫停線程,會釋放鎖對象
鎖對象.wait();
}
//線程的條件滿足了繼續向下執行
}
Object類的notify()可以喚醒線程,該方法也必須在同步代碼塊中由鎖對象調用. 沒有使用鎖對象調用 wait()/notify()會拋出IlegalMonitorStateExeption異常. 如果有多個等待的線程,notify()方法只能喚醒其中的一個. 在同步代碼塊中調用notify()方法后,并不會立即釋放鎖對象,需要等當前同步代碼塊執行完后才會釋放鎖對象,一般將notify()方法放在同步代碼塊的最后. 它的偽代碼如下:
synchronized( 鎖對象 ){
//執行修改保護條件 的代碼
//喚醒其他線程
鎖對象.notify();
}
package com.wkcto.wait;
/**
* 需要通過notify()喚醒等待的線程
* 北京動力節點老崔
*/
public class Test03 {
public static void main(String[] args) throws InterruptedException {
String lock = "wkcto"; //定義一個字符串作為鎖對象
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("線程1開始等待: " + System.currentTimeMillis());
try {
lock.wait(); //線程等待,會釋放鎖對象,當前線程轉入blocked阻塞狀態
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程1結束等待:" + System.currentTimeMillis());
}
}
});
//定義第二個線程,在第二個線程中喚醒第一個線程
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
//notify()方法也需要在同步代碼塊中,由鎖對象調用
synchronized (lock){
System.out.println("線程2開始喚醒 : " + System.currentTimeMillis());
lock.notify(); //喚醒在lock鎖對象上等待的某一個線程
System.out.println("線程2結束喚醒 : " + System.currentTimeMillis());
}
}
});
t1.start(); //開啟t1線程,t1線程等待
Thread.sleep(3000); //main線程睡眠3秒,確保t1入睡
t2.start(); //t1線程開啟3秒后,再開啟t2線程喚醒t1線程
}
}
notify()方法后不會立即釋放鎖對象
package com.wkcto.wait;
import java.util.ArrayList;
import java.util.List;
/**
* notify()不會立即釋放鎖對象
* 北京動力節點老崔
*/
public class Test04 {
public static void main(String[] args) throws InterruptedException {
//定義一個List集合存儲String數據
List<String> list = new ArrayList<>();
//定義第一個線程,當list集合中元素的數量不等于5時線程等待
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (list){
if ( list.size() != 5 ){
System.out.println("線程1開始等待: " + System.currentTimeMillis());
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程1被喚醒:" + System.currentTimeMillis());
}
}
}
});
//定義第二個線程,向list集合中添加元素
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (list){
for (int i = 0; i < 10; i++) {
list.add("data--" + i);
System.out.println("線程2添加了第" + (i+1) + "個數據");
//判斷元素的數量是否滿足喚醒線程1
if (list.size() == 5 ){
list.notify(); //喚醒 線程, 不會立即釋放鎖對象,需要等到當前同步代碼塊都執行完后才能釋放鎖對象
System.out.println("線程2已經發現喚醒通知");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
t1.start();
//為了確保t2在t1之后開啟,即讓t1線程先睡眠
Thread.sleep(500);
t2.start();
}
}
當線程處于wait()等待狀態時, 調用線程對象的interrupt()方法會中斷線程的等待狀態, 會產生InterruptedException異常。
package com.wkcto.wait;
/**
* Interrupt()會中斷線程的wait()等待
* 北京動力節點老崔
*/
public class Test05 {
public static void main(String[] args) throws InterruptedException {
SubThread t = new SubThread();
t.start();
Thread.sleep(2000); //主線程睡眠2秒, 確保子線程處于Wait等待狀態
t.interrupt();
}
private static final Object LOCK = new Object(); //定義常量作為鎖對象
static class SubThread extends Thread{
@Override
public void run() {
synchronized (LOCK){
try {
System.out.println("begin wait...");
LOCK.wait();
System.out.println("end wait..");
} catch (InterruptedException e) {
System.out.println("wait等待被中斷了****");
}
}
}
}
}
notify()與notifyAll()
notify()一次只能喚醒一個線程,如果有多個等待的線程,只能隨機喚醒其中的某一個; 想要喚醒所有等待線程,需要調用notifyAll()。
package com.wkcto.wait;
/**
* notify()與notifyAll()
* 北京動力節點老崔
*/
public class Test06 {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object(); //定義一個對象作為子線程的鎖對象
SubThread t1 = new SubThread(lock);
SubThread t2 = new SubThread(lock);
SubThread t3 = new SubThread(lock);
t1.setName("t1");
t2.setName("t2");
t3.setName("t3");
t1.start();
t2.start();
t3.start();
Thread.sleep(2000);
//調用notify()喚醒 子線程
synchronized (lock){
// lock.notify(); //調用一次notify()只能喚醒其中的一個線程,其他等待的線程依然處于等待狀態,對于處于等待狀態的線程來說,錯過了通知信號,這種現象也稱為信號丟失
lock.notifyAll(); //喚醒所有的線程
}
}
static class SubThread extends Thread{
private Object lock; //定義實例變量作為鎖對象
public SubThread(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock){
try {
System.out.println(Thread.currentThread().getName() + " -- begin wait...");
lock.wait();
System.out.println( Thread.currentThread().getName() + " -- end wait...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
wait(long)的使用
wait(long)帶有long類型參數的wait()等待,如果在參數指定的時間內沒有被喚醒,超時后會自動喚醒。
package com.wkcto.wait;
/**
* wait(long)
* 北京動力節點老崔
*/
public class Test07 {
public static void main(String[] args) {
final Object obj = new Object();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
synchronized ( obj ){
try {
System.out.println("thread begin wait");
obj.wait(5000); //如果5000毫秒內沒有被喚醒 ,會自動喚醒
System.out.println("end wait....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t.start();
}
}
通知過早
線程wait()等待后,可以調用notify()喚醒線程, 如果notify()喚醒的過早,在等待之前就調用了notify()可能會打亂程序正常的運行邏輯。
package com.wkcto.wait;
/**
* notify()通知過早
* 北京動力節點老崔
*/
public class Test08 {
public static void main(String[] args) {
final Object Lock = new Object(); //定義對象作為鎖對象
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (Lock){
try {
System.out.println("begin wait");
Lock.wait();
System.out.println("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (Lock){
System.out.println("begin notify");
Lock.notify();;
System.out.println("end nofity");
}
}
});
//如果先開啟t1,再開啟t2線程,大多數情況下, t1先等待,t1再把t1喚醒
// t1.start();
// t2.start();
//如果先開啟t2通知線程,再開啟t1等待線程,可能會出現t1線程等待沒有收到通知的情況,
t2.start();
t1.start();
}
}
package com.wkcto.wait;
/**
* notify()通知過早, 就不讓線程等待了
* 北京動力節點老崔
*/
public class Test09 {
static boolean isFirst = true; //定義靜態變量作為是否第一個運行的線程標志
public static void main(String[] args) {
final Object Lock = new Object(); //定義對象作為鎖對象
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (Lock){
while ( isFirst ) { //當線程是第一個開啟的線程就等待
try {
System.out.println("begin wait");
Lock.wait();
System.out.println("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (Lock){
System.out.println("begin notify");
Lock.notify();;
System.out.println("end nofity");
isFirst = false; //通知后,就把第一個線程標志修改為false
}
}
});
//如果先開啟t1,再開啟t2線程,大多數情況下, t1先等待,t1再把t1喚醒
// t1.start();
// t2.start();
//如果先開啟t2通知線程,再開啟t1等待線程,可能會出現t1線程等待沒有收到通知的情況,
t2.start();
t1.start();
//實際上,調用start()就是告訴線程調度器,當前線程準備就緒,線程調度器在什么時候開啟這個線程不確定,即調用start()方法的順序,并不一定就是線程實際開啟的順序.
//在當前示例中,t1等待后讓t2線程喚醒 , 如果t2線程先喚醒了,就不讓t1線程等待了
}
}
在使用wait/nofity模式時,注意wait條件發生了變化,也可能會造成邏輯的混亂。
package com.wkcto.wait;
import java.util.ArrayList;
import java.util.List;
/**
* wait條件發生變化
* 定義一個集合
* 定義一個線程向集合中添加數據,添加完數據后通知另外的線程從集合中取數據
* 定義一個線程從集合中取數據,如果集合中沒有數據就等待
* 北京動力節點老崔
*/
public class Test10 {
public static void main(String[] args) {
//定義添加數據的線程對象
ThreadAdd threadAdd = new ThreadAdd();
//定義取數據的線程對象
ThreadSubtract threadSubtract = new ThreadSubtract();
threadSubtract.setName("subtract 1 ");
//測試一: 先開啟添加數據的線程,再開啟一個取數據的線程,大多數情況下會正常取數據
// threadAdd.start();
// threadSubtract.start();
//測試二: 先開啟取數據的線程,再開啟添加數據的線程, 取數據的線程會先等待,等到添加數據之后 ,再取數據
// threadSubtract.start();
// threadAdd.start();
//測試三: 開啟兩個 取數據的線程,再開啟添加數據的線程
ThreadSubtract threadSubtract2 = new ThreadSubtract();
threadSubtract2.setName("subtract 2 ");
threadSubtract.start();
threadSubtract2.start();
threadAdd.start();
/*
某一次執行結果如下:
subtract 1 begin wait....
subtract 2 從集合中取了data后,集合中數據的數量:0
subtract 1 end wait..
Exception in thread "subtract 1 " java.lang.IndexOutOfBoundsException:
分析可能的執行順序:
threadSubtract線程先啟動, 取數據時,集合中沒有數據,wait()等待
threadAdd線程獲得CPU執行權, 添加數據 , 把threadSubtract線程喚醒,
threadSubtract2線程開啟后獲得CPU執行權, 正常取數據
threadSubtract線程獲得CPU執行權, 打印 end wait..., 然后再執行list.remove(0)取數據時,現在list集合中已經沒有數據了, 這時會產生java.lang.IndexOutOfBoundsException異常
出現異常的原因是: 向list集合中添加了一個數據,remove()了兩次
如何解決?
當等待的線程被喚醒后, 再判斷一次集合中是否有數據可取. 即需要把sutract()方法中的if判斷改為while
*/
}
//1)定義List集合
static List list = new ArrayList<>();
//2)定義方法從集合中取數據
public static void subtract(){
synchronized (list) {
// if (list.size() == 0) {
while (list.size() == 0) {
try {
System.out.println(Thread.currentThread().getName() + " begin wait....");
list.wait(); //等待
System.out.println(Thread.currentThread().getName() + " end wait..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object data = list.remove(0); //從集合中取出一個數據
System.out.println( Thread.currentThread().getName() + "從集合中取了" + data + "后,集合中數據的數量:" + list.size());
}
}
//3)定義方法向集合中添加數據后,通知等待的線程取數據
public static void add(){
synchronized (list){
list.add("data");
System.out.println( Thread.currentThread().getName() + "存儲了一個數據");
list.notifyAll();
}
}
//4)定義線程類調用add()取數據的方法
static class ThreadAdd extends Thread{
@Override
public void run() {
add();
}
}
//定義線程類調用subtract()方法
static class ThreadSubtract extends Thread{
@Override
public void run() {
subtract();
}
}
}