Java中的协程实现

2019-07-04 00:00:00 java

因为后端支撑并发连接数的提高以及微服务化的趋势,Non-blocking IO编程越发的必要了。Java由于还没有官方的协程支持,比较主流的都是采用Future/Reactive-Stream之类的call back方式,程序逻辑被割裂,给程序编写、维护、重构都带来很重的心智负担。

实际上Java中还是有一些第三方的协程实现是可以用的,在对非阻塞IO依赖很高的业务中可以考虑尝试一下,目前可用性比较高的有Quasar和ea-async.

这些方案基本上都是通过byte code Instrument,把编译后同步程序class文件修改为异步的操作。这里面有两个关键因素,一个是现场(主要是stack, 操作数栈)的保存和恢复,一个是能够在某些点暂停执行和重入执行。其中前者通过Instrument 字节码进去很容易修改,后者虽然JVM中也有PC(程序计数器)的概念,但是却不能通过字节码修改,所以一般都要更麻烦一些,用一个状态机来实现,退出时保存状态,重入时根据状态跳转到不同的代码位置。Instrument可以在编译后来做(如果用maven配置个maven插件就可以),在可以在启动的之后,加在agent来做。

Quasar

puniverse/quasar 以及其为web开发提供的整合puniverse/comsat 应该是比较流行的了。

Quasar是一个比较庞大的项目,拥有完整的生态,提供了Fiber实现,调度器,甚至Channel,Actor编程范式这样的支持。底层通过Instrument替换了Thread.par为Fiber.park,因而对于java.util.concurrent包中的各种同步原语和容器都可以在Fiber中使用。使用Quasar的Fiber编程,和原先写多线程程序没有任何区别,IO(需要用XXXChannel)/同步操作(除了synchronized)/ThreadLocal这些也都和阻塞IO一样的使用方法,只是需要把Thread替换成Fiber,然后类需要加Suspendable注解或者抛出SuspendExecution异常就可以了。其中Suspendable和SuspendExecution都是为了告诉Quasar哪些类需要做Instrument。Quasar的Fiber调度,Fiber让出执行权给调度器,是要通过抛异常来退出的,所以调度器性能上会差一些。

// Fiber实现的简单“高并发”HTTP Server ServerSocketChannel s = SocketChannel.open().bind(new InetSocketAddress(8080));
new Fiber(() -> {
    while (true) {
        SocketChannel ch = s.accept();
        new Fiber(() -> {
            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
            int n = ch.read(buffer);
            String response = "HTTP/1.0 200 OK\r\n";
            ch.write(encoder.encode(CharBuffer.wrap(response)));
            ch.close();
        }).start();
    }
}).start();

Comsat则对数据库、HTTP、Tomcat/Jetty等常用的lib和容器等提供了集成和封装。

不过项目的维护人Ron Pressler已经加入Oracle,参与JDK官方的协程项目Project Loom,所以这个项目近两年都没有大的更新了,如果要Java9或者以上的版本最好就不要选择这个方案了。

ea-async

electronicarts/ea-async 是一个提供async-await 风格协程实现的工具。实现比较简单,主要的就是Instrument的代码,其他没有提供额外的支持了。简单有时候也是优点,ea-async项目容易理解,学习成本低,效率上损失也不大。使用ea-async编写的代码,方法必须返回CompletableFuture或者CompletionStage,这反而也易于和Spring以及其他现有第三方框架整合。

以官方的例子来看,使用CompletableFuture的话要这么写:

import static java.util.concurrent.CompletableFuture.completedFuture;

public class Store {
    public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost) {
        return bank.decrement(cost)
            .thenCompose(result -> {
                if(!result) {
                    return completedFuture(false);
                }
                return inventory.giveItem(itemTypeId).thenApply(res -> true);
            });
    }
}

如果使用ea-async可以写成:

import static com.ea.async.Async.await;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class Store{
    public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost) {
        if(!await(bank.decrement(cost))) {
            return completedFuture(false);
        }
        try {
            await(inventory.giveItem(itemTypeId));
            return completedFuture(true);
        } catch (Exception ex) {
            await(bank.refund(cost));
            throw new AppException(ex);
        }
    }
}

虽然是target Java8开发的,ea-async目前在Java9和Java10上都可以很好的使用。在JDK官方的Fiber实现可用之前,ea-async是一个非常不错的替代选择。

然而其缺点也是很明显的,改写后的代码stack被破坏,debug,profiler都无法正常工作了。

    原文作者:hsiafan
    原文地址: https://zhuanlan.zhihu.com/p/36862142
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。

相关文章