Actor based parallelism using Scala and Akka : Part 2

Dispatchers in Akka

Dispatchers are used to control the flow of execution in Akka - they can control how messages are being sent, delivered, received and processed. Based on the dispatching policy, dispatchers will route the incoming message to respective destination. Dispatchers are what make Akka 'tick'.

There’s a number of different dispatchers that Akka provides readily and also lets you write your own implementation, should you need. Here are a few dispatchers that come out-of-the-box with Akka
  • Dispatcher
This is an event-based dispatcher that binds an actor to a thread pool. It is based on java ExecutorService backed thread pool.
Creates and maintains one mailbox per actor
Can be shared between a number of actors of same or different types
  • Pinned Dispatcher
Dedicates a unique thread for each actor. In other words, each actor is backed by a single thread pool of size 1. The assigned thread is deallocated after a certain period of inactivity.
No sharing of threads between actors
Creates and maintains one mailbox per actor
Is good for asynchronous, long running computations
  • BalancingDispatcher
Event based dispatcher that binds multiple actors of same type to a thread pool
Maintains one mailbox that is shared between multiple actors of same type - the shared mail box effectively acts as a load balancer
  • CallingThreadDispatcher
Doesn't create new threads. Runs invocations synchronously from the current thread
Typically used only for testing

Here's an example of a dispatcher configuration :

my-thread-pool-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2
core-pool-size-max = 10
}
throughput = 5000
}


To use it specify the dispatcher name as arg to withDispatcher method :

val config = ConfigFactory.load.getConfig( "config\\application.conf" )
val system = ActorSystem ("my-actor-system", config )
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-thread-pool-dispatcher"), "myactor1")
Routers :
Routers are middle level actors in Akka that forward messages to other actors. Routers can also be used to share load between multiple actor instances of the same kind.
A router requires two arguments - routing/load-balancing logic and list of actor routees (must be of same type) between which the router operates.

Some of the supported options for routing logic are :
  • round-robin
  • random
  • smallest-mailbox : very useful and popular load balancing heuristic
  • broadcast : send to everyone (like a topic)
  • scatter-gather-first : send to everyone, only accept the first response
  • consistent-hashing : useful for performance optimization. all messages with same hash will be delivered to the same actor instance
Let's look at an example :

class Master extends Actor {

val slaves = for(i <- 1 to 3) yield {
val slave = context.actorOf(Props[Slave], s"slave_$i")
context.watch(slave)
ActorRefRoutee(slave)
}

val router = Router(RoundRobinRoutingLogic(), slaves)

override def receive: Receive = {
case message =>
router.route(message, sender())
}
}

class Slave extends Actor with ActorLogging {
override def receive: Receive = {
case message => log.info(message.toString)
}
}

val system = ActorSystem("RoutersDemo")
val master = system.actorOf(Props[Master])

for (i <- 1 to 6) {
master ! s"[$i] Hello from the world"
}

In the above code, we are creating 3 actor instances of type Slave - slave_1, slave_2, slave_3 from Master actor. The list slaves has 3 ActorRefRoutee instances - an ActorRefRoutee is a Routee implementation which routes a message to the underlying ActorRef. The receive method of Master actor uses the router to route the messages

Here's the output when we send a few messages. The logs clearly indicate that the messages are distributed among the routees and no single actor is handling all the messages

[INFO] [04/04/2019 20:04:43.408] [RoutersDemo-akka.actor.default-dispatcher-4] [akka://RoutersDemo/user/$a/slave_3] [3] Hello from the world
[INFO] [04/04/2019 20:04:43.409] [RoutersDemo-akka.actor.default-dispatcher-5] [akka://RoutersDemo/user/$a/slave_1] [1] Hello from the world
[INFO] [04/04/2019 20:04:43.409] [RoutersDemo-akka.actor.default-dispatcher-5] [akka://RoutersDemo/user/$a/slave_1] [4] Hello from the world
[INFO] [04/04/2019 20:04:43.409] [RoutersDemo-akka.actor.default-dispatcher-4] [akka://RoutersDemo/user/$a/slave_3] [6] Hello from the world
[INFO] [04/04/2019 20:04:43.408] [RoutersDemo-akka.actor.default-dispatcher-2] [akka://RoutersDemo/user/$a/slave_2] [2] Hello from the world
[INFO] [04/04/2019 20:04:43.410] [RoutersDemo-akka.actor.default-dispatcher-2] [akka://RoutersDemo/user/$a/slave_2] [5] Hello from the world


There is an alternate, less tedious way to achieve the same thing - by using a Pool router instead :

class Slave extends Actor with ActorLogging {
override def receive: Receive = {
case message => log.info(message.toString)
}
}

val system = ActorSystem("RoutersDemo")
val poolMaster = system.actorOf(RoundRobinPool(3).props(Props[Slave]), "poolMaster")
for (i <- 1 to 6) {
poolMaster ! s"[$i] Hello from the world"
}

Output:

[INFO] [04/04/2019 20:23:12.998] [RoutersDemo-akka.actor.default-dispatcher-5] [akka://RoutersDemo/user/poolMaster/$a] [1] Hello from the world
[INFO] [04/04/2019 20:23:12.998] [RoutersDemo-akka.actor.default-dispatcher-3] [akka://RoutersDemo/user/poolMaster/$c] [3] Hello from the world
[INFO] [04/04/2019 20:23:12.999] [RoutersDemo-akka.actor.default-dispatcher-5] [akka://RoutersDemo/user/poolMaster/$a] [4] Hello from the world
[INFO] [04/04/2019 20:23:12.999] [RoutersDemo-akka.actor.default-dispatcher-3] [akka://RoutersDemo/user/poolMaster/$c] [6] Hello from the world
[INFO] [04/04/2019 20:23:12.998] [RoutersDemo-akka.actor.default-dispatcher-4] [akka://RoutersDemo/user/poolMaster/$b] [2] Hello from the world
[INFO] [04/04/2019 20:23:12.999] [RoutersDemo-akka.actor.default-dispatcher-4] [akka://RoutersDemo/user/poolMaster/$b] [5] Hello from the world


Broadcast :
Akka routing has a special type of message called Broadcast. A broadcast message is sent to every actor in the routee pool regardless of the routing strategy

For example, look the code snippet here. We are sending the message only once (wrapping it inside a Broadcast object) and our routing strategy is round robin. However, all the 3 actors in the pool received the message

val system = ActorSystem("RoutersDemo")
val poolMaster = system.actorOf(RoundRobinPool(3).props(Props[Slave]), "poolMaster")
poolMaster ! Broadcast("Avangers assemble !!!")

Output:

[INFO] [04/04/2019 20:30:53.732] [RoutersDemo-akka.actor.default-dispatcher-8] [akka://RoutersDemo/user/poolMaster/$b] Avangers assemble !!!
[INFO] [04/04/2019 20:30:53.732] [RoutersDemo-akka.actor.default-dispatcher-5] [akka://RoutersDemo/user/poolMaster/$a] Avangers assemble !!!
[INFO] [04/04/2019 20:30:53.732] [RoutersDemo-akka.actor.default-dispatcher-9] [akka://RoutersDemo/user/poolMaster/$c] Avangers assemble !!!


Mailbox:
By default, a mailbox is just a regular FIFO buffer. However, there may be situations when you want an actor to process certain messages with higher priority . To handle such requirements, we can build a custom priority mailbox. All we need to do is define a PriorityGenerator which translates messages to numeric priority values (Lower the value, higher the priority)
e.g:

class PriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case message: String if message.contains("[ERROR]") => 0
case message: String if message.contains("[WARN]") => 1
case message: String if message.contains("[INFO]") => 2
case message: String if message.contains("[DEBUG]") => 2
})

Once we define a mailbox, we can tweak config to associate that mailbox with a dispatcher :

prio-dispatcher {
mailbox-type = "PriorityMailbox"
}

Finally we need to tell Akka to use this dispatcher :

val actorRef = system.actorOf(Props[SimpleActor].withDispatcher("prio-dispatcher"))

Here we are sending the DEBUG message first, followed by WARN, followed by ERROR message , however the order of processing is exactly opposite :

object Mailboxes extends App {

val system = ActorSystem("Experiment")

class SimpleActor extends Actor with ActorLogging {
override def receive: Receive = {
case message => log.info(message.toString)
}
}

class PriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case message: String if message.contains("[ERROR]") => 0
case message: String if message.contains("[WARN]") => 1
case message: String if message.contains("[INFO]") => 2
case message: String if message.contains("[DEBUG]") => 2
})

val actorRef = system.actorOf(Props[SimpleActor].withDispatcher("prio-dispatcher"))
actorRef ! "[DEBUG] Nobody cares about me"
actorRef ! "[WARN] Sometimes I feel ignored"
actorRef ! "[ERROR] I feel i am important"
}

Output :

[INFO] [04/04/2019 20:45:58.319] [Experiment-akka.actor.default-dispatcher-2] [akka://Experiment/user/priorityActor] [ERROR] I feel i am important
[INFO] [04/04/2019 20:45:58.319] [Experiment-akka.actor.default-dispatcher-2] [akka://Experiment/user/priorityActor] [WARN] Sometimes I feel ignored
[INFO] [04/04/2019 20:45:58.319] [Experiment-akka.actor.default-dispatcher-2] [akka://Experiment/user/priorityActor] [DEBUG] Nobody cares about me



comments powered by Disqus