Actor based parallelism using Scala and Akka : Part 1

Threads and locks are popular programming constructs because they tend to mimic what the underlying hardware does. However, use of threads and low level concurrency features require developers to have in-depth knowledge of memory model and often inner implementations of built in Java library classes. Actor based concurrency model is a lot more beginner-friendly. Functional programming avoids the problems with shared mutable state by avoiding mutable state. Actor based programming, on the other hand, retains the concept of mutable state but avoids sharing it. Akka toolkit is a very popular library which lend actor support to Java, Scala or any other JVM based programming languages.
In the essence, Actor Model revolves around actors, behavior, and messages. The actor is some kind of entity or concept which fulfills a very few simple rules. It encapsulates the behavior to react to the messages sent to it by other actors. It also can send messages, create new actors, and change its behavior with respect to processing the next messages.
Although it might be not obvious at first glance, but Actor Model is inherently concurrent and asynchronous. Actors do not share anything with each other and communicate only by sending messages which are also immutable. If a subset of actors needs to share common state, it is all done by including it as part of the message payload.
Every actor has a mailbox, also known as message buffer, or queue, where messages are kept while waiting to be picked up. Every actor has a message loop or message handler where the received messages get processed. The extremely important implementation detail is that each actor handles only one message at a time in a single thread. Actors may cross the boundaries of JVMs and physical machines, preserving the transparency of the communication and embracing elastic scalability.
Akka is one of the most popular libraries implementing Actor based concurrent programming model.
To define an actor, we have to extend the Actor trait and provide an implementation to the receive() method. The receive method has return type as PartialFunction[Any, Unit]. Scala provides a shortcut or alias for PartialFunction[Any, Unit] called Receive.
Here’s an example of an Actor implementing a word counter functionality:

class WordCountActor extends Actor {
var total = 0

def receive: Receive = {
case message: String =>
println(s"[word counter] I have received: $message")
total += message.split(" ").length
case msg => println(s"[word counter] I cannot understand ${msg.toString}")
}
}


Every actor in Akka lives within a container called ActorSystem. An ActorSystem is a heavyweight data-structure that acts as a container for a number of threads under the hood and manages allocation of threads to actors.

val system = ActorSystem("actorCapabilitiesDemo")

Once we have an actor system, we can use it to create an actor. To instantiate an actor we need to specify the class-name and a unique name for the actor instance

val wordCounter = ystem.actorOf(Props[WordCountActor], "wordCounter")

Note that actorOf() returns an instance of type ActorRef and not of type Actor. This design approach prevents situations where a client invokes the receive() method directly on the actor instance.
To send a message to an actor, we can use the overloaded ! operator with the corresponding actor ref

wordCounter ! "Thank you Mario but our princess is in another castle!!!"

This will cause the actor’s receive() function to invoked with the message in a different thread

import akka.actor.{Actor, ActorSystem, Props}

object ActorsIntro extends App {

val actorSystem = ActorSystem("firstActorSystem")

class WordCountActor extends Actor {
var total = 0

def receive: Receive = {
case message: String =>
println(Thread.currentThread.getName()+ s" I have received: $message")
total += message.split(" ").length
case msg => println(s"[word counter] I cannot understand ${msg.toString}")
}
}

val wordCounter = actorSystem.actorOf(Props[WordCountActor], "wordCounter")
val anotherWordCounter = actorSystem.actorOf(Props[WordCountActor], "anotherWordCounter")

println(Thread.currentThread.getName + " sending messages")
wordCounter ! "Thank you Mario but our princess is in another castle!!!"
anotherWordCounter ! "That's all folks"
}

Output:

main sending messages
firstActorSystem-akka.actor.default-dispatcher-4 I have received: Thank you Mario but our princess is in another castle!!!
firstActorSystem-akka.actor.default-dispatcher-2 I have received: That's all folks

Clearly the message sender thread isn’t same as the message receiver thread which proves that the messages were indeed processed asynchronously
In this case, we used String as the message type. However (as the datatype PartialFunction[Any, Unit] suggests, message type can be anything. In general messages need to follow two conditions :
  1. Messages must be immutable (Since there is no compile time construct that lets us know whether an object’s state is mutable or not, the onus is own developers to ensure that messages are indeed immutable)
  2. Messages must be serializable (Akka capabilities are not limited to single JVM)
Actors have information about execution context through a property named context. Context has access to the current actor system (through context.system) as well as the ActorRef pointing to the current actor (through context.self)
Even though we don’t instantiate the actors directly and access them through actor ref, Akka still allows us to pass constructor arguments to actor classes. Here’s another example of the WordCounter actor with the word separator character as a configurable argument:

object WordCounter {
def props(sep: String) = Props(new WordCounter(sep))
}

class WordCounter(sep: String) extends Actor {
var total = 0

def receive: Receive = {
case message: String =>
println(Thread.currentThread.getName+ s" I have received: $message")
total += message.split(" ").length
case msg => println(s"[word counter] I cannot understand ${msg.toString}")
}
val wc = actorSystem.actorOf(WordCount.props(" "))
wc ! "That's all folks"


Not only actors can receive messages, they can reply to messages as well. In case of a chain of actors, in order to reply to a message, an actor needs to know who the sender of the message is. When an actor sends a message, it also sends an implicit reference to it’s corresponding ActorRef as part of the message metadata. context.sender() provides us a reference to the ActorRef corresponding to the sender of the message. In case of a chain of actors, context.sender() returns null for the first actor in the chain
An actor can also forward a message to another actor. For that it just needs a handle to the ActorRef corresponding to the second actor. Unlike the ! operator, the forward method preserves the original sender reference while forwarding a message.

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

object ActorCapabilities extends App {

class SquareActor extends Actor {
override def receive: Receive = {
case x:Int => println(self+ " received response from "+sender()+ " :"+x)
case RegularMessage(x) => {
println(self + s" Received Regular Message with value $x")
context.sender() ! x*x
}
case ForwardMessage(x, ref) => {
println(self + s" Received Forward Message with value $x")
println(self + s" Forwarding to $ref")
ref forward x
}
case SelfMessage(x) => {
println(self + s" Received Regular Message with value $x")
self ! x*x
}
}
}

val system = ActorSystem("mySystem")
val actor1 = system.actorOf(Props[SquareActor], "squareActor1")
val actor2 = system.actorOf(Props[SquareActor], "squareActor2")

actor1 ! 2 // who is the sender?!

case class RegularMessage(content: Int)
case class ForwardMessage(content: Int, ref: ActorRef)
case class SelfMessage(content:Int)

case object Start
class Test extends Actor {
override def receive: Receive = {
case Start => {
actor1 ! RegularMessage(5)
actor2 ! ForwardMessage(6, actor1)
actor1 ! SelfMessage(5)
}
case x:Int => println(self + " received "+x)
}
}

val test = system.actorOf(Props[Test],"test")
test ! Start
}

Output:

Actor[akka://mySystem/user/squareActor1#-395877909] received response from Actor[akka://mySystem/deadLetters] :2
Actor[akka://mySystem/user/squareActor1#-395877909] Received Regular Message with value 5
Actor[akka://mySystem/user/squareActor2#199412861] Received Forward Message with value 6
Actor[akka://mySystem/user/squareActor2#199412861] Forwarding to Actor[akka://mySystem/user/squareActor1#-395877909]
Actor[akka://mySystem/user/test#866703272] received 25
Actor[akka://mySystem/user/squareActor1#-395877909] received response from Actor[akka://mySystem/user/test#866703272] :6
Actor[akka://mySystem/user/squareActor1#-395877909] Received Regular Message with value 5
Actor[akka://mySystem/user/squareActor1#-395877909] received response from Actor[akka://mySystem/user/squareActor1#-395877909] :25


Lets go through the output and try to understand the behavior.
  1. The first message with integer value 2 wasn’t sent from an actor. So t. response went to deadLetters. DeadLetters is a fake actor inside Akka that takes care of all undelivered messages. In this case, sender() would return null so sender () ! msg would result in the message being sent to dead letters actor.
  2. The test actor sent a RegularMessage with content as 5 to squareActor1. squareActor1 send the response (25) back to test Actor
The corresponding entries in console are:

Actor[akka://mySystem/user/squareActor1#-395877909] Received Regular Message with value 5
Actor[akka://mySystem/user/test#866703272] received 25


  1. The test Actor sent a ForwardMessage to squareActor 2 with params (6, sqaureActor1). Square Actor 2 forwarded the message to squareActor1.The corresponding console entries are :

Actor[akka://mySystem/user/squareActor2#199412861] Received Forward Message with value 6
Actor[akka://mySystem/user/squareActor1#-395877909] received response from Actor[akka://mySystem/user/test#866703272] :6


  1. Finally test Actor sent a SelfMessage to sqareActor1, which sent the integer value as a message to itself again. The corresponding entries in console are :

Actor[akka://mySystem/user/squareActor1#-395877909] Received Regular Message with value 5
Actor[akka://mySystem/user/squareActor1#-395877909] received response from Actor[akka://mySystem/user/squareActor1#-395877909] :25

Actors in Akka can be stateful and we can have logic in receive method to treat messages differently depending on the state. However, this can easily become unmanageable spaghetti code if the number of states or conditions is too high. Also, mutable state isn’t exactly in-line with the functional programming paradigm. A more elegant alternative is to have more than one callback methods and switch to the appropriate one depending on param.

context.become() allows us to switch the message handler function. It accepts a partial function (of type Receive) and changes the message handler function for the current actor to the input arg. Internally it maintains a stack and places the old handler at the top of the stack. context.become() also accepts an optional boolean param (set to true by default). If the boolean param is set to false, after processing the next message, the system reverts to the old message handler. context.unbecome() works in exactly opposite way - it discards the current message handler, pulls the top element from message handler stack and makes it the new message handler for the current actor

Here’s an example demonstrating context.become :

case object Increment
case object Decrement
case object Print

class Counter extends Actor {

override def receive: Receive = countReceive(0)

def countReceive(currentCount: Int): Receive = {
case Increment =>
println(s"[countReceive($currentCount)] incrementing")
context.become(countReceive(currentCount + 1))
case Decrement =>
println(s"[countReceive($currentCount)] decrementing")
context.become(countReceive(currentCount - 1))
case Print => println(s"[countReceive($currentCount)] my current count is $currentCount")
}
}

val counter = system.actorOf(Props[Counter], "myCounter")

(1 to 5).foreach(_ => counter ! Increment)
(1 to 3).foreach(_ => counter ! Decrement)
counter ! Print


Output:

[countReceive(0)] incrementing
[countReceive(1)] incrementing
[countReceive(2)] incrementing
[countReceive(3)] incrementing
[countReceive(4)] incrementing
[countReceive(5)] decrementing
[countReceive(4)] decrementing
[countReceive(3)] decrementing
[countReceive(2)] my current count is 2


To stop an actor x, we can use context.stop(x). Stopping an actor also causes all it’s child actors to be stopped. Children of an actor are other actors created by it. Note that context.stop() is an asynchronous operation and we don't have control on when the actor is actually stopped. Before an actor is stopped, it stops all its children actors.


import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}

object ChildActors extends App {

object Parent {
case class StartChild(name: String)
case class StopChild(name: String)
case class PingChild(name: String)
case object Stop
}
class Parent extends Actor {
import Parent._

override def receive: Receive = withChildren( Map.empty[String, ActorRef])

def withChildren(map : Map[String, ActorRef]):Receive = {
case StartChild(name) => {
val childRef = context.actorOf(Props[Child], name)
context.become(withChildren(map + (name -> childRef) ))
}
case StopChild(name) => {
val childOption = map.get(name)
childOption.foreach(c => context.stop(c))
}
case PingChild(name) => {
val child = map.get(name).get
child ! "ping"
}
case Stop => context.stop(self)
}
}

class Child extends Actor {
override def receive: Receive = {
case msg => println(self.path + " received "+ msg.toString)
}
}

import Parent._

val system = ActorSystem("ParentChildDemo")
val parent = system.actorOf(Props[Parent], "parent")
parent ! StartChild("Alpha")
parent ! StartChild("Bravo")
parent ! StartChild("Charlie")

parent ! PingChild("Alpha")
parent ! PingChild("Bravo")
parent ! PingChild("Charlie")

TimeUnit.SECONDS.sleep(1)

parent ! StopChild("Alpha")
parent ! StopChild("Bravo")

TimeUnit.SECONDS.sleep(1)

parent ! PingChild("Alpha")
parent ! PingChild("Bravo")
parent ! PingChild("Charlie")

parent ! Stop

TimeUnit.SECONDS.sleep(1)

parent ! PingChild("Charlie")
}


In the above example, we are sending 3 new StartChild messages to parent actor which results in 3 new actors being created

Next, we are sending 3 PingChild messages which should result in one message be sent to each of the 3 child actors

Now we are sending two StopChild messages which would result in the actors alpha and bravo being stopped

If we send ping messages again, messages meant for alpha and bravo should go to dead letters whereas the one meant for charlie should go through as expected

Finally we are stopping the parent. Any subsequent message after this point should go to dead letters

Lets see the output console and verify if everything went as expected :


akka://ParentChildDemo/user/parent/Bravo received ping
akka://ParentChildDemo/user/parent/Alpha received ping
akka://ParentChildDemo/user/parent/Charlie received ping
akka://ParentChildDemo/user/parent/Charlie received ping
[INFO] [03/21/2019 19:56:00.819] [ParentChildDemo-akka.actor.default-dispatcher-4] [akka://ParentChildDemo/user/parent/Alpha] Message [java.lang.String] from Actor[akka://ParentChildDemo/user/parent#-1038669063] to Actor[akka://ParentChildDemo/user/parent/Alpha#1443091938] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ParentChildDemo/user/parent/Alpha#1443091938]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/21/2019 19:56:00.819] [ParentChildDemo-akka.actor.default-dispatcher-4] [akka://ParentChildDemo/user/parent/Bravo] Message [java.lang.String] from Actor[akka://ParentChildDemo/user/parent#-1038669063] to Actor[akka://ParentChildDemo/user/parent/Bravo#-1699269885] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ParentChildDemo/user/parent/Bravo#-1699269885]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/21/2019 19:56:00.848] [ParentChildDemo-akka.actor.default-dispatcher-3] [akka://ParentChildDemo/user/parent] Message [part2actors.ChildActors$Parent$PingChild] without sender to Actor[akka://ParentChildDemo/user/parent#-1038669063] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ParentChildDemo/user/parent#-1038669063]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


Akka also provides special messages which we can use to kill actors :
  • PoisonPill : upon receive, triggers shutdown process for the current actor
  • Kill : less graceful than PoisonPill. Makes the receiving actor throw ActorKilledException leading to shutdown

Here’s a code snippet from Actors.scala library class showing how Akka handles these special built-in message types :

def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))

msg.message match {
case t: Terminated ⇒ receivedTerminated(t)
case AddressTerminated(address) ⇒ addressTerminated(address)
case Killthrow new ActorKilledException("Kill")
case PoisonPill ⇒ self.stop()
case sel: ActorSelectionMessage ⇒ receiveSelection(sel)
case Identify(messageId) ⇒ sender() ! ActorIdentity(messageId, Some(self))
}
}


Akka also allows us to setup a death watcher for an actor. context.watch(x) registers the current actor as a death watcher for actor x. When x dies, the current actor will receive a special message called Terminated

Let’s modify the example to handle Terminated message:

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}

object ChildActors extends App {

object Parent {
case class StartChild(name: String)
case class StopChild(name: String)
case class PingChild(name: String)
case object Stop
}
class Parent extends Actor {
import Parent._

override def receive: Receive = withChildren( Map.empty[String, ActorRef])

def withChildren(map : Map[String, ActorRef]):Receive = {
case StartChild(name) => {
val childRef = context.actorOf(Props[Child], name)
context.watch(childRef)
context.become(withChildren(map + (name -> childRef) ))
}
case StopChild(name) => {
val childOption = map.get(name)
childOption.foreach(c => context.stop(c))
}
case PingChild(name) => {
val child = map.get(name).get
child ! "ping"
}
case Stop => context.stop(self)
case Terminated(actor) => println(s"$actor has died")
}
}

class Child extends Actor {
override def receive: Receive = {
case msg => println(self.path + " received "+ msg.toString)
}
}

import Parent._

val system = ActorSystem("ParentChildDemo")
val parent = system.actorOf(Props[Parent], "parent")
parent ! StartChild("Alpha")
parent ! StartChild("Bravo")
parent ! StartChild("Charlie")

parent ! StopChild("Alpha")
parent ! StopChild("Bravo")
parent ! StopChild("Charlie")


Output:

Actor[akka://ParentChildDemo/user/parent/Alpha#1008840127] has died
Actor[akka://ParentChildDemo/user/parent/Bravo#1742707412] has died
Actor[akka://ParentChildDemo/user/parent/Charlie#-1353306828] has died


The output clearly indicates that the parent was notified upon stopping the child actors

An actor’s lifecycle involves the following phases:
  • Start : A new ActorRef is created with a new UUID at a given path
  • Suspend : The ActorRef might allow messages to be enqueued but won't process any more messages until its state changes to Resumed
  • Resume : opposite of Suspend.
  • Restart : in case of any unhandled Exception
  1. The actor ref is suspended
  2. The actor instance is replaced with a new instance (any internal state is lost)
  3. The actor ref is resumed again
  • Stop : Frees an actor ref within a path. All the watching actors receive a Terminated message

Akka provides a bunch of callback methods for handling lifecycle events. Note that all the callback methods have return type as Unit so they are intended to cause a side effect :
  • preStart(): Unit
  • postStop(): Unit
  • preRestart(reason: Throwable, message: Option[Any]): Unit
  • postRestart(reason: Throwable): Unit

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}

object ActorLifecycle extends App {

object StartChild
class LifecycleActor extends Actor with ActorLogging {

override def preStart(): Unit = log.info("I am starting")
override def postStop(): Unit = log.info("I have stopped")

override def receive: Receive = {
case StartChild =>
context.actorOf(Props[LifecycleActor], "child")
}
}

val system = ActorSystem("LifecycleDemo")

object Fail
object FailChild
object CheckChild
object Check

class Parent extends Actor {
private val child = context.actorOf(Props[Child], "supervisedChild")

override def receive: Receive = {
case FailChild => child ! Fail
case CheckChild => child ! Check
}
}

class Child extends Actor with ActorLogging {

override def preStart(): Unit = log.info("supervised child started")
override def postStop(): Unit = log.info("supervised child stopped")

override def preRestart(reason: Throwable, message: Option[Any]): Unit =
log.info(s"supervised actor restarting because of ${reason.getMessage}")

override def postRestart(reason: Throwable): Unit =
log.info("supervised actor restarted")

override def receive: Receive = {
case Fail =>
log.warning("child will fail now")
throw new RuntimeException("I failed")
case Check =>
log.info("alive and kicking")
}
}

val supervisor = system.actorOf(Props[Parent], "supervisor")
supervisor ! FailChild
supervisor ! CheckChild

}


Output:

[INFO] [03/21/2019 22:06:00.282] [LifecycleDemo-akka.actor.default-dispatcher-6] [akka://LifecycleDemo/user/supervisor/supervisedChild] supervised child started
[WARN] [03/21/2019 22:06:00.285] [LifecycleDemo-akka.actor.default-dispatcher-6] [akka://LifecycleDemo/user/supervisor/supervisedChild] child will fail now
[ERROR] [03/21/2019 22:06:00.311] [LifecycleDemo-akka.actor.default-dispatcher-2] [akka://LifecycleDemo/user/supervisor/supervisedChild] I failed
java.lang.RuntimeException: I failed
at part4faulttolerance.ActorLifecycle$Child$$anonfun$receive$3.applyOrElse(ActorLifecycle.scala:56)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at part4faulttolerance.ActorLifecycle$Child.aroundReceive(ActorLifecycle.scala:42)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
at akka.actor.ActorCell.invoke(ActorCell.scala:557)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [03/21/2019 22:06:00.311] [LifecycleDemo-akka.actor.default-dispatcher-6] [akka://LifecycleDemo/user/supervisor/supervisedChild] supervised actor restarting because of I failed
[INFO] [03/21/2019 22:06:00.314] [LifecycleDemo-akka.actor.default-dispatcher-6] [akka://LifecycleDemo/user/supervisor/supervisedChild] supervised actor restarted
[INFO] [03/21/2019 22:06:00.315] [LifecycleDemo-akka.actor.default-dispatcher-6] [akka://LifecycleDemo/user/supervisor/supervisedChild] alive and kicking


An exception causes an actor to be restarted - the message that caused the exception is taken off queue and isnt enqueued again. For chained actors, parent actor decides how to deal with a child’s failure. Depending on business logic, the parent can decide to :
  1. Resume the actor
  2. Restart the actor (default behavior)
  3. Escalate the failure and fail itself


comments powered by Disqus