Scala 中应用 Future 并发编程

Java 世界里进行并发编程有 Future 和  ExecutorService(当成 ThreadPool 来对待),及至 Java 8 引入了更为趁手的 CompletableFuture。那么使用 Scala 该如何进行并发编程呢?Scala 能毫无障碍的使用 Java 类库,所以完全可以用 Java 的 API 来使用线程池,只是那不太 Scala 罢了。

既然是 Scala, 就尽量写成 Scala Style 吧,Scala 也有自己的 Future, 还有 Promise 呢? 至于 Promise 与 Future 多大区别,语义上似乎也差不多,只看到 Promise 中有一个 Future 的属性。如果想玩得高级一点的话就该把  Actor 弄起来,它算是所谓的纤程,多个  Actor 可跑在同一个线程中,当然启动要快,内存消耗少,还避免了上下文切换。

这里还是先体验 Scala Future 的用法。Future 本身只是描述了一个任务,及将要获得的结果(或执行的动作),因此那样一个任务也是要放到线程池中去执行。这和 Java 的 Future/ThreadPool 是一样的概念。稍有不同的是线程池的创建与使用,线程池的默认大小配置等。看个简单的应用示例,本次测试的 Scala 版本是 2.11.12,为了突出实际的线程池大小,我们把任务数设置为 20 个

Scala Future 并发编程体验

上面的程序代码输出

ForkJoinPool-1-worker-5, 5
ForkJoinPool-1-worker-1, 7
AvailableProcessors: 8
ForkJoinPool-1-worker-11, 2
ForkJoinPool-1-worker-15, 8
ForkJoinPool-1-worker-3, 6
ForkJoinPool-1-worker-7, 4
ForkJoinPool-1-worker-9, 3
ForkJoinPool-1-worker-13, 1
ForkJoinPool-1-worker-5, 12
ForkJoinPool-1-worker-1, 14
ForkJoinPool-1-worker-15, 10
ForkJoinPool-1-worker-7, 11
ForkJoinPool-1-worker-3, 9
ForkJoinPool-1-worker-13, 13
ForkJoinPool-1-worker-9, 16
ForkJoinPool-1-worker-11, 15
ForkJoinPool-1-worker-5, 17
ForkJoinPool-1-worker-15, 19
ForkJoinPool-1-worker-7, 20
ForkJoinPool-1-worker-1, 18

输出的结果可能有点意外,因为线程的编号是  1 至 15 间的奇数,正好是以用处理器数目 8。这样的线程编号差一点让我误以为线程池大小是 15。

如何创建任务

简单来理解一下上面的程序,Future { ... }  应该是用来向线程池提交任务的,那么线程池是什么呢?大小多少?瞧瞧它的实现方法 scala.concurrent.Future.apply(...)

这是柯里化的函数定义方式,第二个参数是一个隐式参数,类型为 ExecutionContext。进到 impl.Future(body) 的实现代码 scala.concurrent.impl.Future

确确实实的是像 executor 代表的线程池中提交任务。

线程池由谁来提供

那么这里的隐式参数 implicit executor: ExecutionContext 是由谁来隐式提供的呢?很明显是

扮演了该角色。顺着这个  global 直接来到 scala.concurrent.impl.ExecutionContextImpl 的 createExecutorService 方法

正常情况下它会创建一个 scala.concurrent.forkjoin.ForkJoinPool, 它也是一个 Java ExecutorService 实现。

如何控制全局线程池的大小

先看线程池的大小是怎么控制的,即这里面的 desiredParallelism 数值,由三个系统属性确定

scala.concurrent.context.minThreads, 默认为 1
scala.concurrent.context.numThreads, 默认为 x1
scala.concurrent.context.maxThreads, 默认为 x1

整数表示为常量值,以 x 开头则表示为可用处理器数目的倍数(Double 值,例如 x1.5)。最终 desiredParalleism 的确定是前两个的最大值与第三个相比的最小值,比如说 minThreads = 10, numThreads = 20, maxThreads = 16, 最后获得的 min(max(10, 20), 16) 还是 16。那么线程池的大小就应该是 16,接下来可验证一下。

欲使得 desiredParallelism 为 16,我们设置以下两个系统属性

仍然是使用上面 20 任务的代码,执行后输出为

ForkJoinPool-1-worker-31
ForkJoinPool-1-worker-27
ForkJoinPool-1-worker-17
AvailableProcessors: 8
ForkJoinPool-1-worker-25
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-9
ForkJoinPool-1-worker-21
ForkJoinPool-1-worker-13
ForkJoinPool-1-worker-23
ForkJoinPool-1-worker-29
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-11
ForkJoinPool-1-worker-19
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-15
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-23
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-15

从 1 到 31 间的奇数,总共 16 个线程。从 scala.concurrent.forkjoin.ForkJoinPool 的线程命名的相关代码来看有可能会更奇怪,一个要确定线程池大小的实际大小,再也不能看到线程名中的实际最大编号马上反应出线程池的大小。而 java.util.concurrent.ForkJoinPool 的线程编号是连续的。

注意:别看到 minThreads 和 maxThreads 就以为是一个声明上下限的线程池,其实是一个固定大小的线程池,也就是那个 desiredParallelism 值。

以上用编程方式设置系统属情,也可以在程序启动的时候设定

java -Dscala.concurrent.context.minThreads=16 -Dscala.concurrent.context.maxThreads=16 -java abc.jar

创建自定义的线程

前面用的线程池是系统全局的

import scala.concurrent.ExecutionContext.Implicits.global

如果能确定当前进程有哪些地方在使它这个全局线程池的话,是可以使用它的。如果大小不够还能通过系统属性来调节。但是代码由多人维护,很多人都来用全局线程池,反而任务会堵成一团。这就像 Java 8 中大量使用集合的 parallelStream(), 大家都用 ForkJoinPool 的  commonPool() 一样,只是  commonPool 的大小还难以控制。因此必要的时候需要提供自己的线程池实现。

注意到 Future.apply(...) 第二个参数是一个隐式的 ExecutionContext, 所以我们只要声明一个 ExecutionContext 实例,并设定为 implicit 就会自动应用上,或者显式传入第二个参数。方法有二

声明隐式的 ExecutionContext

执行输出

pool-1-thread-3
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1

显式传给  Future.apply(...) 方法

执行效果和上面是一样的。

参考链接:

  1. Scala中Future的线程数
  2. Scala 中的 Future 和 ExecutionContext
  3. scala 隐式详解(implicit关键字)

类别: Scala. 标签: , . 阅读(63). 订阅评论. TrackBack.

Leave a Reply

3 Comments on "Scala 中应用 Future 并发编程"

avatar
绿软库
Guest

感谢分享

stanfen
Guest
stanfen

学习了,scala并发还可以更深入