Scala Futures

A Future instance represents the result of an asynchronous computation. Future is a lightweight parallelism API provided as part of Scala core. Scala Futures are easily interoperable with Java Futures and executor-service implementations
To use Future API we need the following imports

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config

Future has a companion object which acts like a factory for creating Futures - it takes an expression as a by-name param and returns a Future instance representing an asynchronous evaluation of the expression
E.g the following expression creates a Future:

val f = Future { Thread.sleep(1000); 10 }

To evaluate the expression represented by the Future, use :

val x = Await.result(f, 5.second)

We can also access the value through f.value. It returns an instance of Option[Try[T]]. If the Future is still not done with computing, it will return None. It is possible that the computation has resulted in an error - in that case the return value would be of type Some[Failure[T]] , else the return value will be of type Some[Success[T]].
We can also use f.isCompleted to check if a Future instance f is done with evaluation or not.

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import java.util.concurrent.TimeUnit

object Test extends App {
val f = Future {TimeUnit.SECONDS.sleep(5); 10 }
println(f.value)
println(f.isCompleted)
TimeUnit.SECONDS.sleep(6)
println(f.value)
println(f.isCompleted)
}

Output:

None
false
Some(Success(10))
true


Let’s take an example of failure as well :

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import java.util.concurrent.TimeUnit

object Test extends App {
val f = Future {10/0}
TimeUnit.SECONDS.sleep(1)
println(f.value)
println(f.isCompleted)
}


Output:

Some(Failure(java.lang.ArithmeticException: / by zero))
true


Just like Await.result(), we can use Await.ready() to wait for asynchronous computation. The difference is that unlike Await.result(), Await.ready() waits for the evaluation to be completed but doesn’t actually provide us the result of the evaluation (for that we’ll still have to use Await.result())

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import java.util.concurrent.TimeUnit

object Test extends App {
val f = Future {TimeUnit.SECONDS.sleep(2); 42/2}
Await.ready(f, 3.second)
println(f.value)
println(f.isCompleted)
val x = Await.result(f, 0.second)
println(x)
}

Output:

Some(Success(21))
true
21

We can apply transformations on a future using the transform() function - transform allows us to customize action both in case of success as well as failure

object Test extends App {
val f = Future(42/2)
val ft = f.transform({
case util.Success(_) => util.Success("OK")
case util.Failure(_) => util.Success("Not OK")
})
val x = Await.result(ft, 1.second)
println(x)
}

Output:

OK


Lets try another example:

object Test extends App {
val f = Future(42/2)
val ft = f.transform({
case util.Success(_) => util.Success("OK")
case util.Failure(_) => util.Success("Not OK")
})
val x = Await.result(ft, 1.second)
println(x)
}

Output:

Not OK


Scala Futures also provide a bunch of asynchronous callbacks. Callbacks for futures are partial functions. We can pass a callback to the onSuccess method. It will only be called if the Future completes successfully, and if so, it receives the computed value as its input

val f = Future( 42)
f.onSuccess { case x => println(x) }
Await.ready(f, 1.second)

Output:

42

Similarly, we could register a failure callback with the onFailure method. Our callback will receive a Throwable, but it will only be called if the Future did not complete successfully.

val f = Future( 42/0 )
f.onFailure { case e => println(e.getMessage) }
Await.ready(f, 1.second)

Output:

/ by zero

Usually, it’s better to combine these two and register a completion callback that will handle both cases. The input parameter for that callback is a Try:
import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import util._

object Test extends App {

val f = Future( 42/2 )
f.onComplete {
case Success(i) => println(i)
case Failure(e) => println(e.getMessage)
}
Await.ready(f, 1.second)
}


Note that callbacks don’t return anything - their return type is Unit. As a result they can never truly be side-effect-free
We can instead use the andThen() operation as well. The andThen() operation works very similar to the onComplete() callback except that it can actually return result.

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import util._

object Test extends App {
val f = Future( 42/2 )
f.andThen {
case Success(i) => i
case Failure(e) => e.getMessage
}
val x = Await.result(f, 1.second)
println(x)
}


Scala Future also provides constructs for creating Futures with pre-decided outcome : Success or Failure. In case of Success it also lets us define a pre-decided result:

val s = Future.successful(42)

Similarly in case of Failure, we can define a pre-decided custom Exception

val f = Future.failed(new RuntimeException("Gotcha !!!"))


Sometimes, business logic demands that in case of a failure, we define a recovery mechanism. This can be done with recover() :

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import util._

object Test extends App {
val op = Future(42/0)
val x = op.recover {
case _ : ArithmeticException => -1
}
println(Await.result(x, 1.second))
}


Similarly we can use recoverWith(). If we think of recover as the failure-case-map and recoverWith is the failure-case-flatMap

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import util._

object Test extends App {
val op = Future(42/0)
val x = op.recoverWith {
case _ => Future.successful(-1)
}
print(Await.result(x, 1.seconds))
}


We also have the fallbackTo() operation which lets us replace a Failure with another Future

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import util._

object Test extends App {
val op = Future(42/0)
val x = op.fallbackTo(Future.successful(-1))
print(Await.result(x, 1.seconds))
}


Scala Futures can work with Seq implementations as well. The sequence() operation can take a Seq[Future[T]] instance as argument and turn it into a Future[Seq[T]] without blocking
For example, the code snippet below takes a List[Int], applies a function on each of the elements and creates a Future representing an asynchronous computation of the results:

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import util._

object Test extends App {
val nums = List(1,2,3,4,5)
def sq (i:Int):Future[Int] = {Future(i*i)}
val results = Future.sequence(nums.map(sq))
print(Await.result(results, 1.seconds))
}

Output:

List(1, 4, 9, 16, 25)


We can perform fold/reduce operations of a sequence of Futures just like a regular sequence:

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import java.util.concurrent.TimeUnit
import util._

object Test extends App {
val ft1 = Future{ TimeUnit.SECONDS.sleep(1); 1}
val ft2 = Future{ TimeUnit.SECONDS.sleep(2); 2}
val ft3 = Future{ TimeUnit.SECONDS.sleep(3); 3}
val sft = List(ft1, ft2, ft3)
val x = Future.foldLeft(sft)(0)(_ + _)
println(Await.result(x, 4.second))
}

Output:

6


A promise represents a placeholder for an operation. Future and Promise are like two different ends of a wormhole - if Future is the client side view, Promise is the corresponding server side view of the computation. We can obtain the corresponding Future from a Promise by using the .future operation

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config

object Test extends App {
val p = Promise[Int]
val f = p.future
println(f.isCompleted)
println(f.value)
p.success(42)
println(f.isCompleted)
println(f.value)
}

In the above code, at line 1 and 2 we are just declaring a Promise and getting a reference to the corresponding Future. At this stage the promise hasn’t been fulfilled yet, so line 3 and 4 will print false and None respectively to stdout.
Now in line 5 we are fulfilling the promise. So line 6 and 7 will print true and Some(Success(42)) respectively

Output:
false
None
true
Some(Success(42))


Sometimes in asynchronous computations, we need throttling between producers and consumers. Futures allow us to do that by performing operations in batches.
Lets run an operation sequentially first

import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import scala.collection.mutable.Set

object Test extends App {
def calc (i:Int):Future[Int] = Future
for (i <- 0 to 2^31-1){
// just waste time
}
i*i
}
def processSeq(nums : Vector[Int]):Future[Vector[Int]] = {
val f= nums.map(calc)
Future.sequence(f)
}
val input = (1 to 100).toVector
val result = processSeq(input)
Await.result(result, 100.seconds)
}

In the example above we are computing squares of each individual element in a List of 100 elements. The problem is that potentially a very high number of threads might be created to perform this operation which might impact other processes running on the same host.
We can batch the operations like this :

def processSeqBatch(nums : Vector[Int], batchSize : Int):Future[Vector[Int]]={
val batches = nums.grouped(batchSize)
val start = Future.successful(Vector.empty[Int])
batches.foldLeft(start){(accFuture, batch) => for {
acc <- accFuture
batchResult <- processSeq(batch)
} yield acc ++ batchResult
}
}


Final code:


import scala.concurrent._ // Future API
import scala.concurrent.duration._ //DSL for timeouts
import scala.concurrent.ExecutionContext.Implicits.global //thread pool config
import scala.collection.mutable.Set

object Test extends App {

def calc (i:Int):Future[Int] = Future {
for (i <- 0 to 2^31-1){
// just waste time
}
i*i
}
def processSeq(nums : Vector[Int]):Future[Vector[Int]] = {
val f= nums.map(calc)
Future.sequence(f)
}
def processSeqBatch(nums : Vector[Int], batchSize : Int):Future[Vector[Int]]={
val batches = nums.grouped(batchSize)
val start = Future.successful(Vector.empty[Int])
batches.foldLeft(start){(accFuture, batch) => for {
acc <- accFuture
batchResult <- processSeq(batch)
} yield acc ++ batchResult
}
}
val input = (1 to 100).toVector
val result = processSeqBatch(input, 5)
println(Await.result(result, 100.seconds))
}


Output:

Vector(1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801, 10000)




comments powered by Disqus