Playing with Scala futures
View from Kolsåstoppen |
Option
idiom:case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]])Bonus points for immutability, using
case
class and covariance. Much better but more complex implementation involves two case
classes but at least allows modelling empty trees:sealed trait Tree[+T]Let's stick to the first idea. Now implement building a tree with arbitrary height:
case object Empty extends Tree[Nothing]
case class Node[+T](value: T, left: Tree[T], right: Tree[T]) extends Tree[T]
def apply[T](n: Int)(block: => T): Tree[T] = n match {In order to build a tree with 1024 leaves and all random variable it's enough to say:
case 1 => Tree(block, None, None)
case _ =>
Tree(
block,
Some(Tree(n - 1)(block)),
Some(Tree(n - 1)(block))
)
}
val randomTree: Tree[Double] = Tree(1 + 10)(math.random)This is an open-ended question, next requirement may be to write a
map
method equivalent to Seq.map()
or Option.map()
. Understanding what that means is part of the question. The implementation is surprisingly straightforward:case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) = {OK...
def map[R](f: T => R): Tree[R] =
Tree(
f(value),
left.map{_.map(f)},
right.map{_.map(f)})
}
.map{_.map(f)}
, are you kidding me? Remember that left
and right
are Option
s and Option.map(f)
turns Option[T]
to Option[R]
. So first map
comes from an Option
. Second _.map(f)
is actually a recursive call to Tree.map()
. Now we can for example create a second tree (immutability!) with every value incremented but preserving structure:val tree: Tree[Int] = //......or with
val incremented = tree.map(1 +)
toString()
called on each value:val stringified = tree.map(_.toString)Let' go a bit further. If
f
function is time-consuming and side-effect free (which happens to be a frequent requirement when doing map()
) or our tree is considerably big what about parallelizing Tree.map()
in some way? There are few ways to achieve this and quite a few traps. The simplest approach is to use a thread pool backed by ExecutionContext
:case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {Using
def pmap[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[R] = {
val transformed: Future[R] = Future { f(value)}
val leftFuture: Option[Future[Tree[R]]] = left.map { l => Future { l.pmap(f)}}
val rightFuture: Option[Future[Tree[R]]] = right.map { r => Future { r.pmap(f)}}
Tree(
Await.result(transformed, timeout),
leftFuture.map(Await.result(_, timeout)),
rightFuture.map(Await.result(_, timeout)))
}
}
pmap
(name is not a coincidence) is quite simple once you sort out few implicits
:import scala.concurrent.{Await, Future, ExecutionContext}Sample code above will take a simple tree with "alpha" root and "beta" right child and upper case all the values in multiple threads. Calling
import java.util.concurrent.Executors
import scala.concurrent.duration._
val pool = Executors newFixedThreadPool 10
implicit val ec: ExecutionContext = ExecutionContext fromExecutor pool
implicit val timeout = 10.second
val tree = Tree("alpha",
None,
Some(
Tree("beta",
None,
None)))
println(tree.pmap{_.toUpperCase})
Future { ... }
is a simple idiom to submit asynchronous task to a thread pool and get a Future[T]
in return. There are at least couple of issues with this code. First of all it mainly... waits. Several threads will sit idle merely waiting for children to complete. But that's not the worst case scenario. Imagine our thread pool is limited to one thread (problem remains for larger, but still limited pools). We spawn sub tasks for children and wait until they finish. But these sub tasks never start because they are unable to obtain thread from a thread pool. Why? Because there is only one thread in the pool and we are already consuming it! This one and only thread is blocked waiting idle for tasks that can never finish. It's called a deadlock. Actually the code will time out after given amount of time but it doesn't change the fact that the implementation above fails miserably.
ForkJoinPool
would solve this issue but there are more advanced and rewarding solutions.Entering Tree[Future[R]]
Surprisingly there is even better, more functional and clean approach. Reactive programming discourages waiting. Instead of hiding asynchronous nature of processing tree, let's emphasize it! Since the processing is already based on Future
s, place them explicitly in the API:case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {
def mapf[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[Future[R]] = {
Tree(
Future { f(value) },
left.map {_.mapf(f)},
right.map {_.mapf(f)}
)
}
}
Tree.mapf()
returns immediately but instead of returning Tree[R]
we now get Tree[Future[R]]
. So we have a tree where each node contains an independent Future
. How can we go back to familiar Tree[R]
? One approach uses Tree.map()
, which we already implemented:val treeOfFutures: Tree[Future[R]] = ...I bet it's not clear but in principle this is simple - for each node wait on independent future object until all of them are resolved. There is no risk of deadlock because futures are not dependant on each other.
val tree = treeOfFutures.map(Await.result(_, 10.seconds))
Turning Tree[Future[R]]
into Future[Tree[R]]
But we want to go deeper. Why work with a bunch of futures if we can have only one future to rule them all? Think about Future.sequence()
that turns Seq[Future[T]]
into Future[Seq[T]]
. Implementing such method for Tree[Future[T]]
is a nice task on its own. The idea is to have a counter of all unresolved tasks and once all of them are done - dereference all futures without blocking (since they are already finished):object Tree {Code above is a bit imperative and doesn't handle exceptions properly - but can be a good starting point. We iterate over all futures and decrement a counter after each one of them completes. If all futures are done we complete our custom promise. Code above requires two extra methods:
def sequence[T](tree: Tree[Future[T]])(implicit ec: ExecutionContext, timeout: Duration): Future[Tree[T]] = {
val promise = Promise[Tree[T]]()
val pending = new AtomicInteger(tree.size)
for {
future <- tree
value <- future
} if(pending.decrementAndGet() == 0) {
promise.success(
tree.map(Await.result(_, 0.seconds)) //will never block
)
}
promise.future
}
}
Tree.size
and Tree.foreach()
(used implicitly inside for comprehension) - which I leave for you as an exercise.
Tags: concurrency, scala