Typing your actors step by step

15 March 2021

Pedro Ferreira

by Pedro Ferreira

Background

After some frustration with our project’s choice for a planning poker solution, I did what any reasonable developer would do. No, I didn’t search for a more suitable solution, I decided to write my own and do a blog post about it.

The main idea is to build a simple application that uses some features of the following core Akka libraries:

Initially, I deliberately chose to use a classic actor implementation so that I could actually also talk about something that might be more useful: migrating an Akka Classic application to an Akka Typed version.

"Why would you want to do that?", you may ask…​ Well, for me it is the type safety of Akka Typed which means that an Actor’s behaviour isn’t just a function of Any ⇒ Unit and strong support to avoid certain anti-patterns (more eloquently explained here), but feel free to search more if this migration is something for your project.

Overview

Obviously, the observable behaviour should stay the same after migration, which consists of:

  • a stream for each websocket connection with its own wire data domain forwarding all incoming data to the main application controller (Room Manager) as well as providing an interface to forward outgoing data.

  • the Room manager is responsible for translating from the wire domain to the application domain and route translated messages to appropriate rooms and manage each room lifecycle.

  • rooms are representations of each planning poker session, holding all necessary information and making sure that each participant is up to date with it.

Screenshot

Step by step

First things first, we need to replace our old classical actor system with its typed counterpart:

implicit val system: ActorSystem[SpawnProtocol.Command] =
    ActorSystem(Behaviors.setup[SpawnProtocol.Command](_ => SpawnProtocol()), "pointing-poker")

Takeaways

  • There is no default guardian actor, you need to provide one.

Tips

Now that we have our actor system, we need to create our top level actors:

system ! SpawnProtocol.Spawn(RoomManager(), "room-manager", Props.empty, system.ignoreRef)

Takeaways

  • There is no actorOf method on a typed ActorSystem.

  • If you don’t care/need the response, you can use system.ignoreRef

I could have created the RoomManager during the Behaviors.setup when creating the actor system, but I also need the RoomManager reference in my http api, so I created it from “outside” using the SpawnProtocol.Spawn which will reply with the ActorRef once created. However as you can see, I’m ignoring the ref for now, because to actually use it, we need to talk how to ask information out of the actor system:

implicit val timeout: Timeout = 3.seconds

  val roomManagerFuture: Future[ActorRef[RoomManager.Command]] = system.ask { ref =>
    SpawnProtocol.Spawn(RoomManager(), "room-manager", Props.empty, ref)
  }
  implicit val ec: ExecutionContextExecutor = system.executionContext

  roomManagerFuture.onComplete {
    case Success(roomManager) =>
      val api = API(roomManager, apiConfig)
      api.run()
    case Failure(exception) =>
      log.error("Error creating room manager {}", exception)
  }

Takeaways

  • Ask now provides a reference that can receive messages from any actor.

Now we need to convert RoomManager and Room to typed versions, that could be accomplished either by extending AbstractBehavior (object oriented style) or defining functions that return a Behavior (functional style), I’ll be using functional style (comparison):

RoomMananger:

final case class RoomManagerData(rooms: Map[UUID, ActorRef[Room.Command]])

  object RoomManagerData {
    val empty: RoomManagerData = RoomManagerData(rooms = Map.empty[UUID, ActorRef[Room.Command]])
  }

  def apply(): Behavior[Command] =
    Behaviors.setup[Command] { context =>
      val roomResponseActor: ActorRef[Room.Response] =
        context.messageAdapter(response => RoomResponseWrapper(response))
      receiveBehaviour(RoomManagerData.empty, roomResponseActor)
    }

  private[actors] def receiveBehaviour(data: RoomManagerData, roomResponseWrapper: ActorRef[Room.Response]): Behavior[Command] =
    Behaviors
      .receive[Command] { (context, message) =>
        message match {
          case CreateRoom(replyTo) =>
            //handle message
            receiveBehaviour(newData, roomResponseWrapper)
          case ConnectToRoom(message, user) =>
            //handle message
            Behaviors.same
          case IncomeWSMessage(message) =>
            //handle message
            Behaviors.same
          case UnsupportedWSMessage =>
            //handle message
            Behaviors.same
          case WSCompleted(roomId, userId) =>
            //handle message
            Behaviors.same
          case WSFailure(t) =>
            //handle message
            Behaviors.same
        }
      }
      .receiveSignal {
        case (_, Terminated(ref)) =>
          //handle message
          receiveBehaviour(newData, roomResponseWrapper)
      }

Room:

final case class RoomData(
      users: List[User],
      currentIssue: String,
      issueLastEditBy: Option[UUID]
  ) {
    def joinUser(user: User): RoomData = //Omitting implementation to save space
    def vote(userId: UUID, estimation: String): RoomData = //Omitting implementation to save space
    def clear(): RoomData = //Omitting implementation to save space
    def leave(userId: UUID): RoomData = //Omitting implementation to save space
    def editIssue(issue: String, userId: UUID): RoomData = //Omitting implementation to save space
  }

  object RoomData {
    val empty: RoomData = RoomData(List.empty[User], "", Option.empty[UUID])
  }

  def apply(roomId: UUID): Behavior[Command] =
    Behaviors.setup[Command] { _ =>
      receiveBehaviour(roomId, RoomData.empty)
    }

  private def receiveBehaviour(roomId: UUID, data: RoomData): Behavior[Command] =
    Behaviors.receive[Command] { (context, message) =>
      message match {
        case Join(user) =>
          // Handling message using data transformation defined in case class
          receiveBehaviour(roomId, newData)
        case Vote(userId, estimation) =>
          // Handling message using data transformation defined in case class
          receiveBehaviour(roomId, newData)
        case ClearVotes(userId) =>
          // Handling message using data transformation defined in case class
          receiveBehaviour(roomId, newData)
        case ShowVotes(userId) =>
          // Handling message using data transformation defined in case class
          Behaviors.same
        case Leave(userId) =>
          // Handling message using data transformation defined in case class
          Behaviors.same
        case EditIssue(userId, issue) =>
          // Handling message using data transformation defined in case class
          receiveBehaviour(roomId, newData)
      }

    }

Takeaways

  • Internal state is now immutable.

  • Sender sender() reference is not present, causing modification on CreateRoom, now being a part of the message itself replyTo.

  • Already used actor DSL, however now compiler also checks (found a missing extends on EditIssue on Room).

  • The compiler will issue a warning if a message defined in the Actor’s protocol is not being handled.

  • Lifecycle messages are handled in a separate function.

  • Context.stop will only work on child actors.

Tip

  • Think of your actor as a finite state machine.

  • Set data transformation as a function on the case class, making message handling more readable. It also allows to unit testing on transformations.

Previously we had to request (ask) information out of one actor, however when an actor sends a request to another actor, there is a need for translation of the response as Typed actors only can handle messages defined on their domain. This translation is done using an adapater.

object Room {
  sealed trait Response
  final case class Running(roomId: UUID) extends Response
  final case class Stopped(roomId: UUID) extends Response
}

object RoomManager {
  final case class RoomResponseWrapper(response: Room.Response)        extends Command
  def apply(): Behavior[Command] =
    Behaviors.setup[Command] { context =>
      val roomResponseActor: ActorRef[Room.Response] =
        context.messageAdapter(response => RoomResponseWrapper(response))
      receiveBehaviour(RoomManagerData.empty, roomResponseActor)
    }
}

Takeaways

  • When expecting a response from another actor you will need an adapter to convert to the proper message type.

  • If the translation on the adapter fails, the actor will be stopped.

  • There is only one adapter per message type, any new adapter will override the previous one.

Tip

  • Use simple conversion on adapters, leave the actual message handling to the actor’s behaviour.

Now that the actors are typed, the remaining parts of the system need to be adapted to interact with them.

Since Akka HTTP 10.2.x, it is not necessary to convert to untyped actor system to start your HTTP server. Now it looks like:

Http().newServerAt(apiConfig.host, apiConfig.port).bind(route)

Takeaways

  • Before 10.2.x you would need to change to untyped system.

As for the websocket stream, sadly I couldn’t find anything that would make the integration between streams and typed actors. So instead of changing the whole websocket stream code, I decided to use the coexistence functionalities, which allows me to do:

import akka.actor.typed.scaladsl.adapter._

handleWebSocketMessages(
          WS.handler(
            roomId,
            URLDecoder.decode(encodedName, StandardCharsets.UTF_8.name()),
            roomManager.toClassic
          )
        )

Takeaways

  • You are not bound to use only typed actors, both can coexist.

Now that all actors, services and connecting parts are migrated, the tests need to be adjusted. The Actor TestKit provides two utilities for testing, ActorTestKit for asynchronous testing and BehaviorTestKit for synchronous testing.

RoomManagerSpec:

"connect user to room" in {
      val behaviorTestKit = BehaviorTestKit(RoomManager())

      val roomId     = UUID.randomUUID()
      val user1Probe = TestProbe()(testKit.system.classicSystem)
      val user2Probe = TestProbe()(testKit.system.classicSystem)
      val user1      = Room.User(UUID.randomUUID(), "user 1", false, "", user1Probe.ref)
      val user2      = Room.User(UUID.randomUUID(), "user 2", false, "", user2Probe.ref)

      behaviorTestKit.run(
        RoomManager
          .ConnectToRoom(WSMessage(MessageType.Join, roomId, user1.id, user1.name), user1Probe.ref)
      )
      behaviorTestKit.run(
        RoomManager
          .ConnectToRoom(WSMessage(MessageType.Join, roomId, user2.id, user2.name), user2Probe.ref)
      )

      val childInbox = behaviorTestKit.childInbox[Room.Command](roomId.toString)
      childInbox.expectMessage(Room.Join(user1))
      childInbox.expectMessage(Room.Join(user2))
    }

    "handle an IncomeWSMessage that generates an outcome" in {
      val roomId    = UUID.randomUUID()
      val roomProbe = testKit.createTestProbe[Room.Command]()
      val managerRef = testKit.spawn(
        RoomManager.receiveBehaviour(RoomManagerData(Map(roomId -> roomProbe.ref)))
      )
      val userId = UUID.randomUUID()

      managerRef ! RoomManager.IncomeWSMessage(WSMessage(MessageType.Vote, roomId, userId, "5"))
      managerRef ! RoomManager.IncomeWSMessage(
        WSMessage(MessageType.EditIssue, roomId, userId, "issue name")
      )
      managerRef ! RoomManager.IncomeWSMessage(WSMessage(MessageType.Show, roomId, userId, ""))
      managerRef ! RoomManager.IncomeWSMessage(WSMessage(MessageType.Clear, roomId, userId, ""))

      roomProbe.expectMessage(Room.Vote(userId, "5"))
      roomProbe.expectMessage(Room.EditIssue(userId, "issue name"))
      roomProbe.expectMessage(Room.ShowVotes(userId))
      roomProbe.expectMessage(Room.ClearVotes(userId))
    }

RoomSpec:

"Room Actor" should {
    "update current issue and broadcast it" in {
      val issue               = "Issue test 1"
      val (user, userProbe)   = createUser(UUID.randomUUID(), "user1", false, "")
      val (user2, user2Probe) = createUser(UUID.randomUUID(), "user2", false, "")
      val dataProbe           = testKit.createTestProbe[Room.Response]()
      val actingUserId        = UUID.randomUUID()
      val (roomId, roomRef) = createRoom(
        UUID.randomUUID(),
        RoomData.empty.copy(users = List(user, user2))
      )

      val expectedMessage = WSMessage(MessageType.EditIssue, roomId, actingUserId, issue)
      val expectedData = Room.DataStatus(data =
        RoomData(
          users = List(user, user2),
          currentIssue = issue,
          issueLastEditBy = Option(actingUserId)
        )
      )

      roomRef ! Room.EditIssue(actingUserId, issue)

      roomRef ! Room.GetData(dataProbe.ref)

      userProbe.expectMsg(expectedMessage)
      user2Probe.expectMsg(expectedMessage)

      dataProbe.expectMessage(expectedData)
    }
  }

Takeaways

  • Since actors are Behavior functions, overriding internal functions for testing is not suggested like in classical.

  • BehaviorTestKit is better at dealing with an actor’s children.

  • ActorTestKit provides a simple way to create and use probes.

  • There isn’t a clear way to test communication between more than 2 actors (if they aren’t related).

Tips

  • Normally I would only expose the initial behavior from my actor, but I changed the other behavior access to private on the package, so now I can set the behavior data as I need before the test.

private[actors] def receiveBehaviour(roomId: UUID, data: RoomData): Behavior[Command] = ???
  • Include a message that allow to inspect internal state, again access is private on package.

  private[actors] final case class GetData(replyTo: ActorRef[Response]) extends Command

  sealed trait Response
  final case class DataStatus(data: RoomData) extends Response

Closing thoughts

The migration went smoother than I was expecting. Once I started it I couldn’t run the application until it was complete, and after changing the whole engine of the app I only faced two compilation errors that were quickly solved and the application was running again. Those errors were actually already present at in the application before the migration, however they went un-noticed due to not having the checks that typed actors bring, so the migration already proved useful.

Some of the smoothness I experienced while migrating might be explained by the fact that I already followed good practices when using classic actors, specifically having a DSL already defined. If you’re having more troubles when migrating, I would suggest to take a step back and refine your actors' DSL and transitions (How they communicate with each and states that they pass on their lifecycle).

If you want to see more what could be done with typed actors and Scala 3.0, these two blog posts may be interesting to read: Using Scala 3 Union types with Akka Typed - part 1 and Using Scala 3 Union types with Akka Typed - part 2.

Helpful links:

  • Moving from classic actors - here

  • Interaction patterns - here

  • PR for the full migration - here

  • Pointing poker app if you want to use it - here