使用RxJava把你的异步调用同步化


每周的定期博客。

本周个人认为的亮点之一是在测试中使用RxJava,把异步调用同步化。让代码变得“清晰”了许多。

如果你使用基于异步的编程方式的话,肯定会碰到一个如何测试的问题。最简单的比如说

AsyncResult<String> foo = asyncService.foo();

或者基于回调方式的异步代码

asyncSerivce.foo(asyncResult -> {
    // do stuff
});

前一种还好,后一种在多个异步调用时很容易碰到callback hell的问题。

A.foo(a -> {
    a.bar(b -> {
        b.baz(c -> {
        });
    });
});

比如说上面这种嵌套比较深的代码。一种解决方法是类似JavaScript的Promise的链式调用。

A.foo()
  .andThen(a -> a.bar())
  .andThen(b -> b.baz())

但是链式调用的一个问题是,在方法调用之间是非简单类型的依赖关系时,你需要一些中间类,而且调用顺序有所变化时,中间类也必须随之改变。比如说上述代码中bar依赖A,baz依赖A与B时

A.foo()
  .andThen(a -> a.bar().success(b -> (a, b))
  .andThen((a, b) -> a.baz(b))

相比之前的写法会显得繁琐。对此,你可以改用JavaScript的async/await,完全把代码改成同步风格,免去些中间类的需要。

async/await是协程的一种实现,但是Java没有协程,该怎么办?方法很简单,利用类似Future的get把代码同步化就可以了。特别是在测试代码中,异步代码转开成同步代码并没有太大问题。相反的,同步风格的测试代码更好理解。

于是在测试代码中可以这么做

  1. 把回调式代码转为Observable事件源
  2. 使用Observable的blockGet,blockAwait转换为同步代码
Observable<A> makeSourceA() {
  Observable.create(emitter -> {
    A.foo(a -> {
        if(a.isSuccess()) {
            emitter.onNext(a.get());
        } else {
            emitter.onError(a.getError());
        }
        emitter.onComplete();
    });
  });
}

Observable<B> makeSourceB(A a) {
  Observable.create(emitter -> {
    a.bar(b -> {
        if(b.isSuccess()) {
            emitter.onNext(b.get());
        } else {
            emitter.onError(b.getError());
        }
        emitter.onComplete();
    });
  });
}

Observable<C> makeSourceC(B b) {
  Observable.create(emitter -> {
    b.bar(c -> {
        if(c.isSuccess()) {
            emitter.onNext(c.get());
        } else {
            emitter.onError(c.getError());
        }
        emitter.onComplete();
    });
  });
}

A a = makeSourceA().blockingGet();
B b = makeSourceB(a).blockingGet();
C c = makeSourceC(b).blockingGet();

顺便说一句,如果你不会用RxJava,也没有关系,你可以使用CompletableFuture,也可以达到同样的效果。

希望以上的点子对你有用。