我在RxJava使用线程池时遇到的问题

最近在瞎折腾rxjava,写了一段自认为能并发执行的代码如下:

// 大小为5的线程池
ThreadPoolExecutor exec = new ThreadPoolExecutor(
    5, 5, 200, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Flowable.just(1, 2, 3, 4, 5)
        .subscribeOn(Schedulers.from(exec))
        .subscribe(i -> {
            Thread.sleep(1000L);
            System.out.println(i + "/t" + Thread.currentThread().getName());
        });

由于我在subscribe中sleep了1s,所以我认为这五个数字会并发的执行到subscribe中去,期待会有如下的输出

1   pool-1-thread-1
2   pool-1-thread-2
3   pool-1-thread-3
4   pool-1-thread-4
5   pool-1-thread-5

然而事与愿违,实际的输出是这样的

1   pool-1-thread-1
2   pool-1-thread-1
3   pool-1-thread-1
4   pool-1-thread-1
5   pool-1-thread-1

嗯嗯?为什么没有并发执行subscribe里的代码呢?我以为是我自己的代码有问题,又陆续尝试了内置的一些Scheduler,consumer均是在同一个线程中执行的,好吧,看来是我理解错rxjava的Schedulers了,这货的from方法接收一个Executor参数,并不是指接下来的任务会提交给这个线程池并发的执行。

这大概也是RxJava和CompletableFuture的区别之一吧。搜索了一圈,用rxjava实现并发主要以以下几个方法

  1. 在flatMap中使用obseveOn
Flowable.just(1, 2, 3, 4, 5)
        .flatMap(i -> Flowable.just(i).observeOn(Schedulers.from(exec))
                .doOnNext(d -> {
                    System.out.println(d + "/t" + Thread.currentThread().getName());
                    Thread.sleep(1000L);
                }))
        .subscribe();
  1. 在flatMap中使用Future
Flowable.just(1, 2, 3, 4, 5)
        .flatMap(i -> Flowable.fromFuture(CompletableFuture.completedFuture(i), Schedulers.from(exec))
                .doOnNext(d -> {
                    System.out.println(d + "/t" + Thread.currentThread().getName());
                    Thread.sleep(1000L);
                })
        )
        .subscribe();
  1. 使用ParallelFlowable
Flowable.just(1, 2, 3, 4, 5)
        .parallel()
        .runOn(Schedulers.from(exec))
        .doOnNext(d -> {
            System.out.println(d + "/t" + Thread.currentThread().getName());
            Thread.sleep(1000L);
        })
        .sequential()
        .subscribe();

有两个要注意的地方

  1. RxJava在执行并发的时候,并不会使用Executor的maximumPollSize这个属性,corePollSize有多大,那么最大就有多少个线程
  2. parallel()有一个重载方法可以传入并发数,默认为cpu核心数,在单核的服务器上这个数字是1,也就是不管Executor有多少个线程,只会用一个线程去执行任务

后续

最近发现在使用UnicastProcessor和ParallelFlowable的时候有cpu占用高的情况,经过跟踪发现是这样的问题:

Flowable是支持背压的,所以在元素弹出过快的时候会抛出异常,而我又使用了retry,使得在抛出异常的时候会重新订阅Flowable,而UnicastProcessor只能被订阅一次,所以抛出了大量的IllegalStateException

我还发现了其他问题

  1. subscribeOn是对subscribe中的代码不起作用的,应该用observeOn
  2. 之所以没有用到maximumPoolSize,是因为没有达到阀值,应该给blockingqueue设置一个大小

版权声明:
作者:Alex
链接:https://www.techfm.club/p/27065.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>