AKKA for parallel processing

Software Packages

  • AKKA version: 2.14.7
  • SBT version: 0.13.13
  • Scala version: 2.11.8

While Spark is designed for distributed data analysis, AKKA is said best for distributed transaction processing. AKKA, Play, and Scala are showing great momentum in the Scala ecosystem. This practice helps get started with AKKA programming. AKKA can work with both Java and Scala, we will use Scala today. If you follow prior blogs, you should have SBT working. SBT is an interactive build system, it helps a lot in managing dependencies.

AKKA is a toolkit, not a framework. That makes it possible to build web application on top of it. That is really my eternal goal in subsequent blogs.

To demonstrate its capability, we will use the Gregory-Leibniz series to calculate PI. Gregory-Leibniz series is not quite efficient, but it allows calculating PI to almost endless level of precision.

As you can see from the formula, if you want to get to the 10-decimal level precision, you will need to go as far as about 10 billion (10^10) steps. The complexity is thus O(10^n). Here we will see how to divide and conquer using AKKA so that we can get billions of calculations done quickly.

Step 1: Install AKKA

Assuming that we have SBT and JAVA installed, AKKA can be downloaded from http://akka.io/downloads. The latest official version is 2.4.17, which includes a Scala 2.11.8 package. BTW, Java 8 is required here.

Here I am putting it in /opt/app/akka-2.4.17 folder.

[hadoop@nnode1 Downloads]$ unzip akka_2.11-2.4.17.zip
[hadoop@nnode1 Downloads]$ sudo mv akka-2.4.17 /opt/app

Next, you need to add AKKA_HOME to .bashrc, also add the bin directory to the path.

export AKKA_HOME=/opt/app/akka-2.4.17
export PATH=$PATH:$AKKA_HOME/bin

Step 2: Create SBT project

We will put the first AKKA project under /home/hadoop/akka/project1. Without going into details how SBT project folders are structures, here are the folders needed, assuming that we will create a package called project1.pi for our Scala program.

mkdir /home/hadoop/akka
mkdir /home/hadoop/akka/project1
mkdir /home/hadoop/akka/project1/src
mkdir /home/hadoop/akka/project1/src/main
mkdir /home/hadoop/akka/project1/src/main/project1
mkdir /home/hadoop/akka/project1/src/main/project1/pi

Next, we will create the build.sbt file to establish the package dependencies.

Step 3: Create the Scala program to calculate Pi

AKKA is based on the actor model. An actor is basically an automated entity that performs certain job. Actors interact through messages. Actors are managed by a hierarchy, and parent actors are responsible for managing and monitoring child actors. http://akka.io has details about actor lifecycle.

Here we will create a Master actor that acts like a team lead. And we create Worker actors that, well you got, acts like workers. The main program will pass a message “Calculate” to Master actor, which in term will pass “Work” messages to workers. When workers are done, they pass back a “Result” message to Master. And then Master will add up all pieces of results and print out the final PI value. Note the Master actors break the calculation to many small pieces, and then send to individual workers in round robin fashion in this case. This original sample code can be found here. And I modified it a little bit.

Here is the full code of /home/hadoop/akka/project1/src/main/scala/project1/pi/Pi.scala. Initially, we ask each worker actor to do 1 million calculations. There are total 1 billion calculations to be done by 4 workers. So on average, each worker will be assigned about 250 tasks.

Step 4: Create the Scala program to calculate Pi

In /home/hadoop/akka/project1, execute sbt as following. Here you get 3.1415926525897735, which is pretty disappointing because it is only accurate to the 8th decimal points, after 1 billion steps and 10 seconds!. But that is not the point. After all, 10 second is pretty fast.

[hadoop@nnode1 project1]$ sbt run

Now let’s do 10 billion steps by change number of messages to 10K. And restart “sbt run”. This time, it is going to take a while. While it is running, open another terminal window, and type “top -H”. You will see there are 4 java threads running in full speed, each of them represent a Worker actor.

End of day, we get 1 more accurate decimal after doing 10 times more calculations !.

Pi approximation: 3.14159265348979
Calculation time: 93352 milliseconds
Steps (millions): 10000

Pi.scala

package project1.pi
import akka.actor._
import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router }
import scala.concurrent.duration._
object Pi extends App {

calculate(nrOfWorkers = 4, nrOfElements = 1000000, nrOfMessages = 1000)

sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Long, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
case class PiApproximation(pi: Double, duration: Duration, steps: Long)

class Worker extends Actor {
def calculatePiFor(start: Long, nrOfElements: Int): Double = {
var acc = 0.0
for (i ← start until (start + nrOfElements))
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
println(s"worker calculating $start, $nrOfElements results = $acc")
acc
}
def receive = {
case Work(start, nrOfElements) =>
sender().!(Result(calculatePiFor(start, nrOfElements))) // perform the work
}
}
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef) extends Actor {
var pi: Double = _
var nrOfResults: Int = _
val start: Long = System.currentTimeMillis

println("create router")
var workerRouter = {
val routees = Vector.fill(nrOfWorkers) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}
println(s"rounter: $workerRouter")

def receive = {
case Calculate =>
for (i <- 0 until nrOfMessages) workerRouter.route(Work((i * nrOfElements).toLong, nrOfElements), self) case Result(value) =>
pi += value
nrOfResults += 1
println(s"received value: $value")
if (nrOfResults == nrOfMessages) {
listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis, nrOfMessages.toLong * nrOfElements)

context.stop(self)
}

case Terminated(a) =>
workerRouter = workerRouter.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
workerRouter = workerRouter.addRoutee(r)
}
}
class Listener extends Actor {
def receive = {
case PiApproximation(pi, duration, steps) =>
println("\n\tPi approximation: \t%s\n\tCalculation time: \t%s\n\tSteps (millions): \t%s"
.format(pi, duration, steps/1000000))
context.system.shutdown()
}
}

def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// Create an Akka system
val system = ActorSystem("PiSystem")

// create the result listener, which will print the result and shutdown the system
val listener = system.actorOf(Props[Listener], name = "listener")

// create the master
val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener)), name = "master")

println("start the calculation")
master ! Calculate
}
}