Beliebte Suchanfragen

Cloud Native

DevOps

IT-Security

Agile Methoden

Java

//

Introduction to Akka Actors

16.8.2015 | 9 minutes of reading time

In the first part of this series we gave you a high-level overview of Akka – now we are going to take a deep dive into the realm of Akka actors as provided by the akka-actor module which lay the foundations for all other Akka modules.

As we believe that one cannot learn coding without reading and/or writing code, we will develop a tiny actor-based library step by step: a publish-subscribe-based event bus called PubSub. Of course Akka comes with production-ready local and distributed solutions for this domain, so this is just to get our hands dirty with a well-known example. We are going to use the Scala programming language, simply because it’s so much more convenient for writing Akka-based code, but Java could be used to get the same results.

Actor Model

In the actor model – as conceived in 1973 by Carl Hewitt et al. – actors are “fundamental units of computation that embody processing, storage and communication”. All right, let’s try to break this down.

Being a fundamental unit of computation simply means that – when writing software using the actor model – we will mainly focus our design and implementation efforts around actors. In a fantastic interview with Eric Meijer Carl Hewitt explains that “everything is an actor” as well as “one actor is no actor, they come in systems”, which boils down to what we just said: When using the actor model, our code will consist of actors.

So how does an actor look like? What is processing, storage, and communication, after all? In a nutshell, communication means asynchronous messaging, storage means that actors can have state and processing simply means that actors can handle messages, which is also known as behavior. That doesn’t sound terribly complicated, right? So let’s take the next step and look at Akka actors.

Anatomy of an Akka Actor

As shown in the below picture, an Akka actor is made up of several collaborating components. The ActorRef represents the logical address of an actor and enables us to asynchronously send messages to the actor in a fire-and-forget manner. The dispatcher – by default there is one per actor system – takes care of enqueuing messages into the mailbox of an actor as well as scheduling the mailbox for dequeuing one or more messages – but only one at a time – to be processed by the actor. Last but not least, the Actor – typically the only API we have to implement – encapsulates state and behavior.

As we will see later, Akka prevents us from getting direct access to an Actor and thus ensures that asynchronous messaging is the only way to interact with it: It’s impossible to invoke a method on an actor.

It’s also worth pointing out that sending a message to an actor and processing of that message by the actor are two separate activities, which most probably happen on different threads – of course Akka takes care of the necessary synchronization to make sure that any state changes are visible to any thread. Therefore Akka sort of allows us to program in a single-threaded illusion, i.e. we don’t – and mustn’t – use any primitives for synchronization like volatile or synchronized in our actor code.

Implementing an Actor

Nuff talking, let’s start coding! In Akka an actor is a class that mixes in the Actor trait:

1class MyActor extends Actor {
2  override def receive = ???
3}

The method receive returns the so-called initial behavior of an actor. That’s simply a partial function used by Akka to handle messages sent to the actor. As the behavior is a PartialFunction[Any, Unit], there’s currently no way to define actors that only accept messages of a particular type. Actually there’s already an experimental module called akka-typed which brings back typesafety to Akka, but that’s not yet production-ready. By the way, an actor can change its behavior, which is the reason for calling the return value of the method receive the initial behavior.

All right, let’s implement the core actor for our PubSub library:

1class PubSubMediator extends Actor {
2  override def receive = Actor.emptyBehavior
3}

Currently we don’t want PubSubMediator to handle any message, hence we use Actor.emptyBehavior which simply is a partial function not defined for any value.

Actor Systems and Creating Actors

As mentioned before, “one actor is no actor, they come in systems”. In Akka an actor system is a collaborating ensemble of actors which are arranged in a hierarchy. Therefore each and every actor has a parent actor as shown in the below picture.

When we create an actor system, Akka – which internally uses a lot of so called system actors – creates three actors: The root guardian, which is at the root of the actor hierarchy, as well as the user and system guardians. The user guardian – often just referred as the guardian – is the parent of all top-level actors we create – so in this context top-level means “as top as possible for us”.

All right, but how do we create an actor system? Simply by calling the factory method provided by the ActorSystem singleton object:

1val system = ActorSystem("pub-sub-mediator-spec-system")

But why do we have to create an ActorSystem at all? Why not simply create actors? The latter is not possible, because directly calling the constructor of an Actor throws an exception. Instead we have to use a factory method provided by – guess – the ActorSystem to create a top-level actor:

1system.actorOf(Props(new PubSubMediator), "pub-sub-mediator")

Of course actorOf doesn’t return an Actor instance, but instead an ActorRef. This is how Akka prevents us from getting access to an Actor instance, which in turn ensures that asynchronous messaging is the only way to communicate with an actor. The name we provide must be unique amongst the siblings of the actor, else an exception will be thrown. If we don’t provide a name, Akka will create one for us, hence every actor has a name.

But what’s that Props thingy? Well, that’s simply a configuration object for an actor. It takes the constructor as a by-name parameter – i.e. lazily – and can hold other important information, e.g. about routing or deployment.

When it comes to remoting, it’s important that Props can be serialized and therefore it’s an established best practice to add a Props-factory to the companion object of an actor. This is also a good place for a constant for the name of an actor.

With all that knowledge, let’s complete the PubSubMediator and also create a test for it using ScalaTest and the Akka Testkit, which is another Akka module facilitating testing of Akka actors:

1object PubSubMediator {
2 
3  final val Name = "pub-sub-mediator"
4 
5  def props: Props = Props(new PubSubMediator)
6}
7 
8class PubSubMediator extends Actor {
9  override def receive = Actor.emptyBehavior
10}
11 
12class PubSubMediatorSpec extends WordSpec with Matchers with BeforeAndAfterAll {
13 
14  implicit val system = ActorSystem("pub-sub-mediator-spec-system")
15 
16  "A PubSubMediator" should {
17    "be suited for getting started" in {
18      EventFilter.debug(occurrences = 1, pattern = s"started.*${classOf[PubSubMediator].getName}").intercept {
19        system.actorOf(PubSubMediator.props)
20      }
21    }
22  }
23 
24  override protected def afterAll() = {
25    Await.ready(system.terminate(), Duration.Inf)
26    super.afterAll()
27  }
28}

As you can see, we create an ActorSystem and a PubSubMediator actor in PubSubMediatorSpec. The actual test is a little contrived, because our PubSubMediator is still pretty anemic: It makes use of lifecycle debugging and expects a debug message like “started … PubSubMediator …” to be logged. The full code of the current state can be accessed on GitHub under tag step-01 .

Communication

Now that we know how to create actors, let’s talk about communication, which – as mentioned above – is based on asynchronous messaging and strongly relates to the two other properties of an actor: behavior – i.e. the capability to handle messages – and state.

In order to send a message to an actor you need its address, which is represented by its ActorRef:

1mediator ! GetSubscribers("topic")

As you can see, ActorRef offers the ! – pronounced “bang” – operator, which sends the given message to the respective actor. Once the message has been delivered, the operation is completed and the sending code proceeds. That implies that there is no return value (other than Unit), hence messages are indeed sent in a fire-and-forget manner.

While this is simple, we often need responses. Thanks to the fact that the ! operator implicitly takes the sender as an ActorRef, this can easily be done:

1override def receive = {
2  case Subscribe(topic) =>
3    // TODO Actually handle subscription
4    sender() ! Subscribed
5}

In this example the behavior of the receiving actor handles a particular message – the Subscribe command – and sends a message – the Subscribed event – back to the sender. Thereby the method sender is used to access the sender for the message which is currently being processed.

With these insights, let’s further enhance the PubSubMediator and the respective test.

First we add the message protocol – the set of all messages that belong to PubSubMediator – to the companion object:

1object PubSubMediator {
2 
3  case class Publish(topic: String, message: Any)
4  case class Published(publish: Publish)
5 
6  case class Subscribe(topic: String, subscriber: ActorRef)
7  case class Subscribed(subscribe: Subscribe)
8  case class AlreadySubscribed(subscribe: Subscribe)
9 
10  case class Unsubscribe(topic: String, subscriber: ActorRef)
11  case class Unsubscribed(unsubscribe: Unsubscribe)
12  case class NotSubscribed(unsubscribe: Unsubscribe)
13 
14  case class GetSubscribers(topic: String)
15 
16  final val Name = "pub-sub-mediator"
17 
18  def props: Props = Props(new PubSubMediator)
19}

Next let’s implement the behavior, which has been empty so far:

1class PubSubMediator extends Actor {
2  import PubSubMediator._
3 
4  private var subscribers = Map.empty[String, Set[ActorRef]].withDefaultValue(Set.empty)
5 
6  override def receive = {
7    case publish @ Publish(topic, message) =>
8      subscribers(topic).foreach(_ ! message)
9      sender() ! Published(publish)
10 
11    case subscribe @ Subscribe(topic, subscriber) if subscribers(topic).contains(subscriber) =>
12      sender() ! AlreadySubscribed(subscribe)
13 
14    case subscribe @ Subscribe(topic, subscriber) =>
15      subscribers += topic -> (subscribers(topic) + subscriber)
16      sender() ! Subscribed(subscribe)
17 
18    case unsubscribe @ Unsubscribe(topic, subscriber) if !subscribers(topic).contains(subscriber) =>
19      sender() ! NotSubscribed(unsubscribe)
20 
21    case unsubscribe @ Unsubscribe(topic, subscriber) =>
22      subscribers += topic -> (subscribers(topic) - subscriber)
23      sender() ! Unsubscribed(unsubscribe)
24 
25    case GetSubscribers(topic) =>
26      sender() ! subscribers(topic)
27  }
28}

As you can see, the behavior handles all commands – e.g. Publish or Subscribe – and always sends a positive or negative response back to the sender. Whether a command is valid and yields a positive response – e.g. Subscribed – depends on both the command and the state, which is represented as the private mutable field subscribers.

As mentioned above, only one message is handled at a time and Akka makes sure that state changes are visible when the next message is processed, so there is no need to manually synchronize access to subscribers. Concurrency made easy!

Finally let’s take a look at a portion of the extended test:

1val subscribe01 = Subscribe(topic01, subscriber01.ref)
2mediator ! subscribe01
3sender.expectMsg(Subscribed(subscribe01))
4 
5val subscribe02 = Subscribe(topic01, subscriber02.ref)
6mediator ! subscribe02
7sender.expectMsg(Subscribed(subscribe02))
8 
9val subscribe03 = Subscribe(topic02, subscriber03.ref)
10mediator ! subscribe03
11sender.expectMsg(Subscribed(subscribe03))
1context.watch(subscriber)

After that, Akka will send the watching actor a Terminated message after the watched actor has terminated. It’s guaranteed that this is the last message received from an actor, even in the case of remoting.

All right, let’s finish the PubSubMediator:

1class PubSubMediator extends Actor {
2  import PubSubMediator._
3 
4  ...
5 
6  override def receive = {
7    ...
8 
9    case subscribe @ Subscribe(topic, subscriber) =>
10      subscribers += topic -> (subscribers(topic) + subscriber)
11      context.watch(subscriber)
12      sender() ! Subscribed(subscribe)
13 
14    ...
15 
16    case Terminated(subscriber) =>
17      subscribers = subscribers.map { case (topic, ss) => topic -> (ss - subscriber) }
18  }
19}

share post

Likes

0

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.