Java 21 – Virtual Threads

By | 10月 26, 2023

Java 21发布于2023/9/19,是第4个长期支持版本(LTS)。期待已久的 Virtual Thread (源自于 OpenJDK Loom Project)终于转为正式Feature了,大家可以放心的用。

Virtual Thread 在 Go语言里叫 coroutine,是轻量级的线程,用于编写高吞吐量的并发程序。

什么是 Platform Thread?

引入了 Virtual Thread 后,以前我们常用的线程,就改名为 Platform Thread。一个 Platform Thread 对应一个 OS thread,由操作系统调度。Platform Thread 是一种昂贵的资源,操作系统 core 的数量,决定了能并行执行多少个 Platform Thread。

什么是 Virtual Thread?

Virtual Thread 也继承于 java.lang.Thread,但是 Virtual Thread 不是直接和 OS thread 绑定的。

当运行 Virtual Thread 时,JVM 调度器会将 Virtual Thread 分配给空闲的载体线程(Carrier Thread),载体线程其实就是 Platform Thread,OS会调度执行载体线程;当 Virtual Thread 被 I/O 阻塞时,载体线程将被释放运行其它的 Virtual Thread。

虚拟线程采用 M:N 调度,大量(M)的虚拟线程被分配到少量的平台线程。

下面是一个真实例子的输出,10个VirtualThread运行在3个Platform Thread。

// Thread.currentThread().toString()
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#19]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2

Virtual Thread 适合那些长期被阻塞(I/O密集型)的任务,它不适合于计算密集型任务。服务器应用程序就特别适合Virtual Thread:

  • 服务器需要高并发来处理客户端的请求
  • 而且每个客户端的请求都有很多I/O操作,例如获取资源。

对于计算密集型的任务,使用 Stream API (Parallel Stream)。

创建和运行 Virtual Thread

用Thread.Builder运行

用Thread.ofPlatform()和Thread.ofVirtual()创建一个 Thread.Builder 来运行 Thread,task只能是Runnable接口。

// Platform Thread
Thread t1 = new Thread(task);
t1.start();
t1.join();

// Platform Thread
Thread t2 = Thread.ofPlatform().start(task);
t2.join();

// Virtual Thread
Thread t3 = Thread.ofVirtual().start(task);
t3.join();

使用Thread.Builder运行多个Task。

Thread.Builder builder = Thread.ofVirtual().name("worker-", 0);
Runnable task = () -> {
    System.out.println("Thread ID: " + Thread.currentThread().threadId());
};

// name "worker-0"
Thread t1 = builder.start(task);   
t1.join();
System.out.println(t1.getName() + " terminated");

// name "worker-1"
Thread t2 = builder.start(task);   
t2.join();  
System.out.println(t2.getName() + " terminated");

上面代码的输出:

Thread ID: 21
worker-0 terminated
Thread ID: 24
worker-1 terminated

使用ExecutorService运行

Future.get()等待线程执行完毕;try代码块等待所有线程执行完毕,然后调用ExecutorService的close()方法。task可以是Runnable接口或者Callable<T>接口。

try (ExecutorService myExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
  Future<Integer> future = myExecutor.submit(() -> {
    System.out.println("Running virtual thread");
    return 1;
  });
  Integer result = future.get();
  System.out.println("Task completed. Result is: " + result);
}

Virtual Thread性能比较

下面的程序对同样数量的任务,分别用 Virtual Thread 和 Platform Thread 执行,然后打印执行时间。

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class VirtualThreadPerformanceTest {
  
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    VirtualThreadPerformanceTest test = new VirtualThreadPerformanceTest();
    test.launchAndCompare(10);  // Warm up the program
    test.launchAndCompare(1);
    test.launchAndCompare(10);
    test.launchAndCompare(100);
    test.launchAndCompare(1000);
    test.launchAndCompare(1_0000);
    test.launchAndCompare(10_0000);
  }
  
  private void launchAndCompare(int taskNumber) {
    long virtualTime = run(true, taskNumber);
    long platformTime = run(false, taskNumber);
    System.out.println("Tasks: % 7d, Virtual: %s, Platform: %s"
        .formatted(taskNumber, virtualTime, platformTime));
  }

  private long run(boolean isVirtual, int taskNumber) {
    long startTime = System.currentTimeMillis();
    try (ExecutorService executor = newExecutorService(isVirtual)) {
      doRun(executor, taskNumber);
    } // executor.close() is called implicitly, and waits
    return System.currentTimeMillis() - startTime;
  }
  
  private ExecutorService newExecutorService(boolean isVirtual) {
    if (isVirtual) {
      return Executors.newVirtualThreadPerTaskExecutor();
    } else {
//      return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
      return Executors.newCachedThreadPool();
    }
  }
  
  private void doRun(ExecutorService executor, int taskNumber) {
    IntStream.range(0, taskNumber).forEach(i -> {
      executor.submit(() -> {
        Thread.sleep(Duration.ofSeconds(1));
        return i;
      });
    });
  }

}

我的电脑CPU有8个processor,可并行执行8个线程,上面程序执行的输出如下:

Tasks:      10, Virtual: 1035, Platform: 1012
Tasks:       1, Virtual: 1012, Platform: 1010
Tasks:      10, Virtual: 1016, Platform: 1009
Tasks:     100, Virtual: 1011, Platform: 1027
Tasks:    1000, Virtual: 1025, Platform: 1060
Tasks:   10000, Virtual: 1046, Platform: 2080
Tasks:  100000, Virtual: 1268, Platform: 15689

正如官方所说,超过1000个并发时,虚拟线程带来的吞吐量提升才会明显。所以只有并发任务超过1000,且是I/O密集型的任务时,才应当使用虚拟线程。

Pinned Virtual Thread(钉住的虚拟线程)

Java中大部分阻塞操作(Block Operations)都会让 Virtual Thread 释放它的载体线程,但是下面两种阻塞操作,会让 Virtual Thread 一直占有载体线程。这种状态被称为 pinned Virtual Thread。

  • 虚拟线程执行 synchronized 代码块或者方法。
  • 虚拟线程执行 native方法或者 Foreign Function(Preview阶段)

虚拟线程被钉住并不是说不对,但由于载体线程不能释放以便于执行其它的虚拟线程,这会降低程序的并发性。

synchronized里的代码执行如果很快,则影响不大;如果执行比较耗时,如下面的例子:

synchronized(lockObj) {
    frequentIO();
}

则应将用ReentrantLock改写:

ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
    frequentIO();
} finally {
    lock.unlock();
}

Virtual Thread使用指南

虚拟线程的代码应当是同步、阻塞的

虚拟线程的mount和unmout非常廉价,应当使用阻塞函数,写同步风格的代码。

下面的非阻塞、异步执行的代码,不会受益于虚拟线程。

CompletableFuture.supplyAsync(info::getUrl, pool)
   .thenCompose(url -> getBodyAsync(url, HttpResponse.BodyHandlers.ofString()))
   .thenApply(info::findImage)
   .thenCompose(url -> getBodyAsync(url, HttpResponse.BodyHandlers.ofByteArray()))
   .thenApply(info::setImageData)
   .thenAccept(this::process)
   .exceptionally(t -> { t.printStackTrace(); return null; });

反过来,我们应当写成同步执行的代码,这类代码使用Virtual Thread会大大提升程序的并发性,而且容易debug。

try {
   String page = getBody(info.getUrl(), HttpResponse.BodyHandlers.ofString());
   String imageUrl = info.findImage(page);
   byte[] data = getBody(imageUrl, HttpResponse.BodyHandlers.ofByteArray());   
   info.setImageData(data);
   process(info);
} catch (Exception ex) {
   t.printStackTrace();
}

虚拟线程不需要池化

Platform Thread是一种昂贵的资源,所以它需要用线程池管理。但是Virutal Thread是廉价的资源,不需要池化,每个任务创建一个Virtual Thread,虚拟线程的数量始终等于并发任务的数量。推荐如下方式运行虚拟线程:

void handle(Request request, Response response) {
    var url1 = ...
    var url2 = ...
 
    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        var future1 = executor.submit(() -> fetchURL(url1));
        var future2 = executor.submit(() -> fetchURL(url2));
        response.send(future1.get() + future2.get());
    } catch (ExecutionException | InterruptedException e) {
        response.fail(e);
    }
}
 
String fetchURL(URL url) throws IOException {
    try (var in = url.openStream()) {
        return new String(in.readAllBytes(), StandardCharsets.UTF_8);
    }
}

Executors.newVirtualThreadPerTaskExecutor()创建的ExecutorService为每个提交的任务创建一个Virtual Thread。ExecutorService本身也是廉价的资源,可以随时创建。将ExecutorService放在try-with-resource代码块里,其close方法等待所有任务执行结束才会执行;也就是说try代码块后面的代码等待try代码块执行完才会执行,是synchronous操作。

写很短的、耗时很少的并行Task,对Virtual Thread来说是被推荐的。

使用Semaphore来限制虚拟线程的并发数量

有时需要限制并发虚拟线程的数量,例如被调用的服务不能支持10个以上的请求。

对于Platform Thread,我们可以设置线程池的上限为10,这样最多只有10个线程在执行。

ExecutorService es = Executors.newFixedThreadPool(10);
...
Result foo() {
    try {
        var fut = es.submit(() -> callLimitedService());
        return f.get();
    } catch (...) { ... }
}

但是虚拟线程不应当被池化,它不是昂贵的资源,应当使用Semaphore(信号量)来限制有多少个虚拟线程能执行。

Semaphore sem = new Semaphore(10);
...
Result foo() {
    sem.acquire();
    try {
        return callLimitedService();
    } finally {
        sem.release();
    }
}