Three flavours of request-response pattern in Akka
MonitoringActor
and NetworkActor
. Whenever someone (client) sends CheckHealth
to the former one it asks the latter by sending Ping
. NetworkActor
is obligated to reply with Pong
as soon as possible (scenario [A]). Once MonitoringActor
receives such a reply it immediately replies to the client with Up
status message. However MonitoringActor
is obligated to send Down
reply if NetworkActor
failed to respond with Pong
within one second (scenario [B]). Both workflows are depicted below:Ordinary actor
In this scenarioMonitoringActor
listens for Pong
directly without any intermediaries:class MonitoringActor extends Actor with ActorLogging {The implementation of
private val networkActor = context.actorOf(Props[NetworkActor], "network")
private var origin: Option[ActorRef] = None
def receive = {
case CheckHealth =>
networkActor ! Ping
origin = Some(sender)
case Pong =>
origin.foreach(_ ! Up)
origin = None
}
}
NetworkActor
is irrelevant, just assume it responds with Pong
for each Ping
. As you can see MonitoringActor
handles two messages: CheckHealth
sent by the client and Pong
sent presumably by the NetworkActor
. Sadly we had to store the client reference under origin
field because it would have been lost otherwise once CheckHealth
was handled. So we added a bit of state. The implementation is quite straightforward but has quite a few issues:- Subsequent
CheckHealth
will overwrite previousorigin
CheckHealth
should not really be allowed when waiting forPong
- If
Pong
never arrives we are left in inconsistent state - ...because we don't have 1 second timeout condition yet
class MonitoringActor extends Actor with ActorLogging {
private val networkActor = context.actorOf(Props[NetworkActor], "network")
def receive = waitingForCheckHealth
private def waitingForCheckHealth: Receive = {
case CheckHealth =>
networkActor ! Ping
context become waitingForPong(sender)
}
private def waitingForPong(origin: ActorRef): Receive = {
case Pong =>
origin ! Up
context become waitingForCheckHealth
}
}
context.become()
allows to change the behaviour of actor on the fly. In our case we either wait for CheckHealth
or for Pong
- but never both. But where did the state (origin
reference) go? Well, it's cleverly hidden. waitingForPong()
method takes origin
as parameter and returns a PartialFunction
. This function closes over that parameter, thus actor-global variable is no longer necessary. OK, now we are ready to implement 1 second timeout when waiting for Pong
:def receive = waitingForCheckHealthAfter sending
private def waitingForCheckHealth: Receive = {
case CheckHealth =>
networkActor ! Ping
implicit val ec = context.dispatcher
val timeout = context.system.scheduler.
scheduleOnce(1.second, self, Down)
context become waitingForPong(sender, timeout)
}
private def waitingForPong(origin: ActorRef, timeout: Cancellable): Receive = LoggingReceive {
case Pong =>
timeout.cancel()
origin ! Up
context become receive
case Down =>
origin ! Down
context become receive
}
Ping
we immediately schedule sending Down
message to ourselves after precisely one second. Then we go into waitingForPong
. If Pong
arrives we cancel scheduled Down
and send Up
instead. However if we first received Down
it means one second elapsed. So we forward Down
back to the client. Is it just me or maybe such a simple task should not require that amount of code?Moreover please notice that our
MonitoringActor
is not capable of handling more than one client at a time. Once CheckHealth
was received no more clients are allowed until Up
or Down
is sent back. Seems quite limiting. Composing futures
Another approach to the very same problem is employingask
pattern and futures. Suddenly the code becomes much shorter and easier to read:def receive = {That's it! We ask
case CheckHealth =>
implicit val timeout: Timeout = 1.second
implicit val ec = context.dispatcher
val origin = sender
networkActor ? Ping andThen {
case Success(_) => origin ! Up
case Failure(_) => origin ! Down
}
}
networkActor
by sending Ping
and then when response arrives we reply to the client. In case it was a Success(_)
(_
placeholder stands for Pong
but we don't really care) we send Up
. If it was a Failure(_)
(where _
most probably holds AskTimeout
thrown after one second without reply) we forward Down
. There is one enormous trap in this code. In both success and failure callbacks we can't use sender
directly because these pieces of code can be executed much later by another thread. sender
's value is transient and by the time Pong
arrives it might point to any other actor that happened to send us something. Thus we have to keep original sender
in origin
local variable and capture that one instead.If you find this annoying you might play with
pipeTo
pattern:def receive = LoggingReceive {Same as before we
case CheckHealth =>
implicit val ec = context.dispatcher
networkActor.ask(Ping)(1.second).
map{_ => Up}.
recover{case _ => Down}.
pipeTo(sender)
}
ask
(synonym to ?
method) networkActor
with a timeout. If correct reply arrives we map it to Up
. If instead future ends with exception we recover from it by mapping it to Down
message. No matter which "branch" was exercised the result is piped to sender
.You should ask yourself a question: why code above is fine despite using
sender
while the previous one would have been broken? If you look closely at the declarations you'll notice that pipeTo()
takes an ActorRef
by value, not by name. This means that sender
is evaluated immediately when the expression is executed - not later when replies return. We are walking on a thin ice here so please be careful when making such assumptions. Dedicated actor
Actors are lightweight so why not create one just for the sake of a single health check? This throw-away actor would be responsible for communicating withNetworkActor
and pushing reply back to the client. The only responsibility of MonitoringActor
would be to create an instance of this one time actor:class MonitoringActor extends Actor with ActorLogging {
def receive = {
case CheckHealth =>
context.actorOf(Props(classOf[PingActor], networkActor, sender))
}
}
PingActor
is quite simple and similar to the very first solution:class PingActor(networkActor: ActorRef, origin: ActorRef) extends Actor with ActorLogging {When the actor is created we send
networkActor ! Ping
context.setReceiveTimeout(1.second)
def receive = {
case Pong =>
origin ! Up
self ! PoisonPill
case ReceiveTimeout =>
origin ! Down
self ! PoisonPill
}
}
Ping
to NetworkActor
but also schedule timeout message. Now we wait either for Pong
or for timeouted Down
. In both cases we stop ourselves in the end because PingActor
is no longer needed. Of course MonitoringActor
can create multiple independent NetworkActor
s at the same time.This solution combines simplicity and purity of the first one but is robust as the second one. Of course it also requires most code. It's up to you which technique you employ in real life use cases. BTW after writing this article I came across Ask, Tell and Per-request Actors which touches the same problem and introduces similar approaches. Definitely look at it as well!
Tags: akka, scala