Skip to content

JUC 线程间通信初探

本文初步探索线程间通信机制,所谓线程间通信机制,就是在线程之间传递信息,或者协调线程的运行顺序。本文涉及的机制有wait/notify机制、park/unpark机制,以及会介绍两种设计模式:保护性暂停模式和生产者消费者模式。

1. wait/notify机制

1.1 概念与原理

wait()wait(long timeout)notify()notifyAll()都是Object类中的方法,所以Java中的所有对象都有这些方法。使用这些方法的前提是要先获取对象锁

例如,如果直接调用对象的上述方法,会报错:

java
public static void main(String[] args) throws InterruptedException {
    Object o = new Object();
    o.wait();
}

报错如下:

txt
Exception in thread "main" java.lang.IllegalMonitorStateException
	at java.lang.Object.wait(Native Method)

使用这些方法的前提是线程要先获取对象锁:

java
public static void main(String[] args) throws InterruptedException {
    Object o = new Object();
    synchronized (o) {
        o.wait();
    }
}

这些方法的作用如下:

  • wait()wait(long timeout)方法:让获取锁的线程放弃锁,并进入WaitSet集合中等待;
  • notify()方法:在锁对象的WaitSet集合中随机挑选一个线程唤醒,使其进入EntryList中竞争锁;
  • notifyAll()方法:唤醒在锁对象的WaitSet集合中的所有线程,使他们进入EntryList中竞争锁;

原理图如下:

image-20250505141845432

  • 当某个线程拿到Monitor对象的所有权后,如果发现执行条件不满足,此时调用锁对象的wait()方法,则该线程会释放锁,并且进入Monitor对象的WaitSet集合中等待(无限期等待或有限期等待,取决于调的wait()方法有没有传时间);
  • 当其他线程拿到Monitor对象的所有权后,并且调用锁对象的notify()notifyAll()方法,则会唤醒WaitSet集合中正在等待的线程(一个或全部),使得等待的线程进入EntryList中竞争锁;

1.2 wait/notify机制初步使用

在下面的例子中,t1、t2和main线程竞争同一把锁,但是t1和t2线程拿到锁之后,调用wait()方法释放锁并进入WaitSet等待,一秒后,主线程拿到锁对象调用notifyAll()方法,唤醒所有等待中的线程,这些线程竞争锁继续执行。

java
public static void main(String[] args) throws InterruptedException {
    System.out.println(LocalDateTime.now() + " 主线程开始...");
    Object lock = new Object();

    new Thread(() -> {
        synchronized (lock){
            try {
                lock.wait();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(LocalDateTime.now() + " t1执行...");
        }
    },"t1").start();

    new Thread(() -> {
        synchronized (lock){
            try {
                lock.wait();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(LocalDateTime.now() + " t2执行...");
        }
    },"t2").start();

    Thread.sleep(1000);
    synchronized (lock){
        System.out.println(LocalDateTime.now() + " 唤醒所有等待的线程...");
        lock.notifyAll();
    }
}

结果如下:

txt
2025-05-05T14:46:02.011 主线程开始...
2025-05-05T14:46:03.020 唤醒所有等待的线程...
2025-05-05T14:46:03.021 t2执行...
2025-05-05T14:46:03.021 t1执行...

1.3 wait和sleep的区别

wait(long n)sleep(long n)的区别:

  • sleep()Thread类的方法,而wait()Object类的方法;
  • sleep()不需要强制和synchronized配合使用,但wait()需要和synchronized一起使用;
  • sleep()在睡眠期间不会释放对象锁,但wait()在等待期间会释放对象锁;

1.4 正确使用wait/notify机制

  • 当线程拿到对象锁之后,如果发现不满足执行条件,则调用锁对象的wait()方法,释放锁并进入WaitSet集合;

  • 另一个线程拿到对象锁之后,调用notify()notifyAll()方法唤醒等待线程,使等待线程进入EntryList竞争锁;

  • 当等待线程重新获取到锁之后,仍然需要判断是否满足执行条件,如果不满足,则重复上述步骤;

总结使用wait/notify的正确使用方式如下:

java
synchronized(lock){
  while(不满足执行条件){
    lock.wait();
  }
  
  // 执行任务
}

// 另一个线程
synchronized(lock){
  lock.notifyAll();
}

例如,现在有一个学习室,只允许一人进入。小明和小红同时竞争学习室,但是,两人都是马大哈,进入学习室打开书包后,小明发现忘了带笔,小红发现忘了带本子,所以两人都叫了外卖买笔、买本子,并且让出学习室。当外卖员到了之后,进入学习室,通知他们笔送到了,本子送到了。

我们用代码可以模拟如下:

java
static Object room = new Object();
static boolean hasPen = false;
static boolean hasNotebook = false;

public static void main(String[] args) {
    Thread t1,t2;
    t1 = new Thread(()->{
        try {
            // 模拟送货过程
            Thread.sleep(new Random().nextInt(5)*1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        synchronized (room) {
            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 送来了笔...");
            hasPen = true;
            room.notifyAll();

        }
    },"外卖小哥1号");

    t2 = new Thread(()->{
        try {
            // 模拟送货过程
            Thread.sleep(new Random().nextInt(5)*1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        synchronized (room) {
            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 送来了本子...");
            hasNotebook = true;
            room.notifyAll();
        }
    },"外卖小哥2号");

    new Thread(()->{
        synchronized (room){
            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 进入学习室...");

            while (!hasPen){
                System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 没笔,进入休息室...");

                try {
                    if(t1.getState() == Thread.State.NEW) {
                        t1.start();
                    }
                    room.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 开始学习...");
            try {
                Thread.sleep(new Random().nextInt(5) * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 学习结束...");
        }
    },"小明").start();

    new Thread(()->{
        synchronized (room){
            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 进入学习室...");

            while (!hasNotebook){
                System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 没本子,进入休息室...");

                try {
                    if(t2.getState() == Thread.State.NEW) {
                        t2.start();
                    }
                    room.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 开始学习...");
            try {
                Thread.sleep(new Random().nextInt(5) * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + " 学习结束...");
        }
    },"小红").start();
}

结果如下:

txt
15:22:41.069 小明 进入学习室...
15:22:41.069 小明 没笔,进入休息室...
15:22:41.069 小红 进入学习室...
15:22:41.069 小红 没本子,进入休息室...
15:22:44.075 外卖小哥2号 送来了本子...
15:22:44.075 小红 开始学习...
15:22:44.075 小红 学习结束...
15:22:44.075 小明 没笔,进入休息室...
15:22:45.075 外卖小哥1号 送来了笔...
15:22:45.075 小明 开始学习...
15:22:47.080 小明 学习结束...

但是wait/notify机制不能做到精确唤醒。即当外卖小哥2号送来本子时,会把小明和小红都唤醒,然后小明和小红开始竞争学习室的使用权。

  • 如果小红竞争得到了学习室的使用权,发现满足条件开始学习,小明此时只能等待学习室外面,状态为Blocked
  • 如果小明竞争得到了学习室的使用权,发现仍然不满足条件,此时再次进入休息室,状态为Waiting

2. 保护性暂停模式

2.1 概念及初始实现

Guarded Suspension(保护性暂停)的核心内容是:当一个线程尝试访问某个共享资源或执行某个操作时,如果其执行所需的前提条件尚未满足,该线程就会被暂停(挂起),直到条件满足后才会被唤醒并继续执行。

我们可以使用wait/notify机制来实现保护性暂停模式:

java
class GuardedObject{
    private Object response;

    public Object getResponse(){
        synchronized (this){
            // 当没有结果时,释放锁进入等待集合
            while (response == null){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            return response;
        }
    }

    public void setResponse(Object response){
        synchronized (this){
            this.response = response;
            // 唤醒等待线程
            this.notifyAll();
        }
    }
}

使用案例如下:

注意,使用了附录中的ImageDownloader

java
@Slf4j
public class Demo {
    public static void main(String[] args) {

        GuardedObject guardedObject = new GuardedObject();

        new Thread(()->{
            log.info("waiting for response");
            Object response = guardedObject.getResponse();
            log.info("response: " + response.toString());
        },"t1").start();

        new Thread(()->{
            log.info("begin downloading");
            long size = ImageDownloader.downloadImageAndGetSize("https://images.dog.ceo/breeds/briard/n02105251_7772.jpg");
            log.info(size + " bytes downloaded");
            guardedObject.setResponse(size);
        },"t2").start();

    }
}

结果如下:

txt
16:48:04.372 [t2] INFO  : begin downloading
16:48:04.372 [t1] INFO  : waiting for response
16:48:05.698 [t2] INFO  : 39774 bytes downloaded
16:48:05.699 [t1] INFO  : response: 39774

2.2 增加等待超时时间

我们可以在等待的基础上增加等待超时时间,当等待超过一定时间后,无论是否有结果,都返回:

java
class GuardedObject{
    private Object response;

    /**
     * 超时等待结果
     * @param timeout 超时时间,以毫秒为单位
     * @return 返回结果,如果超时,则返回null
     */
    public Object getResponse(long timeout){
        synchronized (this){
            long startTime = System.currentTimeMillis();
            // 已等待时间
            long waitTime = 0;

            while (response == null){
              	// 如果已等待时间已超时,则立即返回
                if (waitTime >= timeout){
                    break;
                }

                try {
                  	// 等待时间,需要减去已等待时间
                    this.wait(timeout - waitTime);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
              	
              	// 更新已等待时间
                waitTime = System.currentTimeMillis() - startTime;
            }

            return response;
        }
    }
}

测试案例一:等待500毫秒,等待超时仍没有获取结果

java
@Slf4j
public class Demo {
    public static void main(String[] args) {

        GuardedObject guardedObject = new GuardedObject();

        new Thread(()->{
            log.info("waiting for response");
            Object response = guardedObject.getResponse(500);
            log.info("response: " + response);
        },"t1").start();

        new Thread(()->{
            log.info("begin downloading");
            long size = ImageDownloader.downloadImageAndGetSize("https://images.dog.ceo/breeds/briard/n02105251_7772.jpg");
            log.info(size + " bytes downloaded");
            guardedObject.setResponse(size);
        },"t2").start();

    }
}

结果:

txt
17:02:26.694 [t2] INFO  : begin downloading
17:02:26.694 [t1] INFO  : waiting for response
17:02:27.196 [t1] INFO  : response: null
17:02:27.571 [t2] INFO  : 39774 bytes downloaded

测试案例二——等待5秒,在等待期间就有结果返回了,则立即结束等待,结果如下:

17:03:52.351 [t2] INFO  : begin downloading
17:03:52.351 [t1] INFO  : waiting for response
17:03:53.134 [t2] INFO  : 39774 bytes downloaded
17:03:53.135 [t1] INFO  : response: 39774

3. 生产者消费者模式

生产者-消费者模式(Producer-Consumer Pattern)是一种经典的多线程同步模式,用于解决并发环境下,生产者线程生产数据与消费者线程消费数据之间如何有效地进行协作和同步的问题。

这个模式的核心思想是通过一个共享的**缓冲区(Buffer)**来解耦生产者和消费者。

主要组成部分:

  1. 生产者(Producer):负责生成数据并将其放入缓冲区中。当缓冲区满时,生产者线程等待,直到消费者从缓冲区中取出数据。
  2. 消费者(Consumer):负责从缓冲区中取出数据并进行处理。当缓冲区空时,消费者线程等待,直到生产者生产数据放入缓冲区。
  3. 缓冲区(Buffer):一个共享的数据存储区域,通常是一个队列。生产者将数据放入其中,消费者从其中取出数据。缓冲区的大小可以是固定的或可变的。

代码实现:

java
private static AtomicInteger id = new AtomicInteger(1);

public static void main(String[] args) {
  	// 创建一个容量为2的消息队列
    MessageQueue messageQueue = new MessageQueue(2);

  	// 创建三个生产者线程,每隔1秒钟往消息队列中放入数据
    for (int i = 1; i <= 3; i++) {
        new Thread(()->{
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                messageQueue.put(new Message(id.getAndAdd(1), LocalTime.now() + " message" + id.get()));
            }
        },"producer-" + i).start();
    }

  	// 创建两个消费者线程,随机等待不大于5秒的时间后,从消息队列中取出数据
    for (int i = 1; i <= 2; i++) {
        new Thread(()->{
            while (true) {
                try {
                    Thread.sleep(new Random().nextInt(5) * 1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                Message message = messageQueue.take();
                log.debug(message.toString());
            }
        },"consumer-"+i).start();
    }
}
java
@AllArgsConstructor
@Getter
@ToString
class Message{
    private int id;
    private String message;
}
java
@Slf4j
class MessageQueue{
    private Deque<Message> queue;
    private int capacity;

    public MessageQueue(int capacity){
        this.capacity = capacity;
        queue = new LinkedList<>();
    }

    // 生产消息
    public void put(Message message){
        synchronized (queue){
          	// 当队列满时,等待
            while (queue.size() == capacity){
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            log.debug("生产消息:" + message);
            queue.addLast(message);
            queue.notifyAll();
        }
    }

  	// 消费消息
    public Message take(){
        synchronized (queue){
          	// 当队列为空时,等待
            while (queue.isEmpty()){
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            log.debug("消费消息");
            Message message = queue.removeFirst();
            queue.notifyAll();

            return message;
        }
    }
}

4. park/unpark机制

4.1 基本使用

park()unpark(Thread t)java.util.concurrent.locks.LockSupport中的方法,主要作用如下:

  • park():暂停当前线程,线程状态变为Waiting
  • unpark(Thread t):恢复线程t的执行;

例子一,线程t1调用park()暂停执行,然后主线程调用unpark(Thread t)恢复线程t1的执行:

java
public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(()->{
        log.debug("AAAAAA");
        LockSupport.park();
        log.debug("BBBBBB");
    },"t1");

    t1.start();

    Thread.sleep(1000);
    log.debug("恢复t1的执行");
    LockSupport.unpark(t1);
}

结果如下:

txt
10:19:10.505 [t1] DEBUG : AAAAAA
10:19:11.510 [main] DEBUG : 恢复t1的执行
10:19:11.510 [t1] DEBUG : BBBBBB

例子二,线程t1启动,主线程先调用unpark(Thread t),然后线程t1调用park()

java
public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(()->{
        log.debug("AAAAAA");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        LockSupport.park();
        log.debug("BBBBBB");
    },"t1");
    t1.start();

    Thread.sleep(1000);

    log.debug("恢复t1的执行");
    LockSupport.unpark(t1);
}

结果如下:

txt
10:25:44.910 [t1] DEBUG : AAAAAA
10:25:45.914 [main] DEBUG : 恢复t1的执行
10:25:46.912 [t1] DEBUG : BBBBBB

可以发现,即使对某个线程先调用了unpark,该线程调用park后也不会暂停执行。

LockSupportpark()unpark()Objectwait()notify() 方法有重要的区别:

  • 不需要持有对象锁:
    • 调用 LockSupport.park()LockSupport.unpark() 不需要当前线程持有特定的对象锁。
    • wait()notify() 必须在 synchronized 块中调用,并且操作的是同一个对象的监视器锁。
  • 更灵活的唤醒
    • unpark() 可以指定唤醒哪个线程。
    • notify() 是随机唤醒一个等待在对象上的线程,notifyAll() 是唤醒所有等待线程。

4.2 原理

park/unpark的核心原理基于**“许可”(permit)** 的概念。每个使用 LockSupport 的线程都与一个许可关联,许可的状态只有两个:有许可或无许可。

LockSupport.park():

  • 当一个线程调用 park() 方法时,它会检查当前线程是否持有许可。
  • 如果当前线程持有许可,park() 会立即消耗掉这个许可,然后直接返回,线程不会被阻塞。
  • 如果当前线程没有许可,park()会导致当前线程阻塞,直到以下情况之一发生:
    • 其他线程调用了 LockSupport.unpark() 并指定了当前线程。
    • 当前线程被中断。
    • (极少数情况下)发生“伪唤醒”(spuriously wakeup),即使没有收到 unpark 信号也返回。因此,通常需要在循环中调用 park() 并重新检查需要等待的条件。

LockSupport.unpark(Thread thread):

  • unpark() 方法用于唤醒指定的线程 thread
  • 它会使目标线程获取许可(如果之前没有的话)。
  • 如果目标线程当前正在调用 park() 并处于阻塞状态,unpark() 会使其解除阻塞。
  • 如果目标线程没有调用 park() 或者已经返回(即不在阻塞状态),unpark() 方法也会使其获取许可。这个许可会被“保存”起来,使得该线程下次调用 park() 时不会被阻塞,而是立即返回并消耗掉这个许可。

许可的特性:

  • 最多只有一个许可: 与信号量(Semaphore)不同,每个线程的许可数量最多为1,不会累积。即使连续调用多次 unpark(),许可数量仍然是1。
  • 许可的可传递性: unpark() 方法可以在 park() 之前或之后调用。
    • 如果在 park() 之前调用 unpark(),那么随后的 park() 将不会阻塞;
    • 如果在 park() 之后调用 unpark(),并且线程处于阻塞状态,那么 park() 将会解除阻塞;

附录

ImageDownloader

由AI生成的图像下载器,返回下载图片的大小:

java
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.MalformedURLException;

public class ImageDownloader {

    /**
     * 从指定的 URL 下载图片并返回其大小(字节)。
     * 如果下载失败或内容不是图片,返回 -1。
     *
     * @param urlString 图片的 URL 地址
     * @return 图片的大小(字节),或 -1 表示失败
     */
    public static long downloadImageAndGetSize(String urlString) {
        HttpURLConnection connection = null;
        long totalBytesRead = 0; // 使用 long 以防止大图片导致溢出

        try {
            // 1. 创建 URL 对象
            URL url = new URL(urlString);

            // 2. 打开连接,并转换为 HttpURLConnection
            connection = (HttpURLConnection) url.openConnection();

            // 3. 设置连接属性 (可选但推荐)
            connection.setRequestMethod("GET"); // HTTP GET 请求
            connection.setConnectTimeout(10000); // 连接超时时间 (毫秒),例如 10 秒
            connection.setReadTimeout(10000);    // 读取超时时间 (毫秒),例如 10 秒
            // 模仿浏览器 User-Agent,一些网站可能需要
            connection.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36");

            // 4. 建立连接 (通常在 getResponseCode() 或 getInputStream() 时隐式调用)
            // connection.connect();

            // 5. 获取 HTTP 响应码并检查
            int responseCode = connection.getResponseCode();
            if (responseCode != HttpURLConnection.HTTP_OK) { // 检查是否为 200 OK
                System.err.println("HTTP GET failed. Response Code: " + responseCode + " for URL: " + urlString);
                return -1; // 返回 -1 表示下载失败
            }

            // 6. [可选但推荐] 检查 Content-Type 确保是图片
            String contentType = connection.getHeaderField("Content-Type");
            if (contentType == null || !contentType.startsWith("image/")) {
                System.err.println("Content is not an image. Content-Type: " + contentType + " for URL: " + urlString);
                return -1; // 返回 -1 表示内容不是图片
            }

            // 7. 获取连接的输入流
            // 使用 try-with-resources 确保 InputStream 被关闭
            try (InputStream inputStream = connection.getInputStream()) {

                // 8. 从输入流中读取字节并计数
                byte[] buffer = new byte[4096]; // 缓冲区大小
                int bytesRead = -1;

                while ((bytesRead = inputStream.read(buffer)) != -1) {
                    totalBytesRead += bytesRead; // 累加读取的字节数
                }

            } // try-with-resources 在这里自动关闭 inputStream

            // 9. 返回总字节数
            return totalBytesRead;

        } catch (MalformedURLException e) {
            System.err.println("Invalid URL format: " + urlString);
            e.printStackTrace();
            return -1;
        } catch (IOException e) {
            System.err.println("Error downloading image from " + urlString + ": " + e.getMessage());
            e.printStackTrace();
            return -1;
        } finally {
            // 10. 关闭 HTTP 连接
            if (connection != null) {
                connection.disconnect();
            }
        }
    }

}