Haben wir uns in den vorherigen Teilen eher um grundlegende Dinge, wie Projekte in sbt und das Testen mittels ScalaTest angeschaut, behandelt dieser Teil die Kernkomponente des XBMC Desktop Clients: Die Kommunikation mit Hilfe von Akka.

Akka ist ein in Scala geschriebenes Framework für parallele, nebenläufige und verteilte Programmierung, das sowohl in Scala als auch in Java verwendet werden kann. Es basiert auf dem sogenannten Aktoren-Modell (Actor Model), das unter anderem in der Programmiersprache Erlang Verwendung findet. Zu allererst möchte ich jedoch einige Worte zum Sinn und Zweck von Akka verlieren.

Java Memory Model

Nebenläufige Programmierung ist verdammt schwierig, vor allem in Java. Man hat mehrere parallellaufende Threads, die gemeinsam eine Aufgabe erledigen sollen und - das große Problem - auf einen gemeinsamen Speicherbereich zugreifen müssen. Einfaches Beispiel ist die Iteration einer Zahl (in Java):

class Besucher {
  int i = 0;
  void iterate() { i = i + 1; }
}

Angenommen zwei Threads rufen kurz nacheinander die Methode iterate() auf. Was passiert? Erster Thread berechnet, da i gleich 0 ist, "0 + 1". i ist nun 1. Was berechnet der zweite Thread? "1 + 1"? Sicher? Ich nicht. Stichwort ist hier Sichtbarkeit: Es ist nicht garantiert, dass der von Thread-1 gesetzte Wert für Thread-2 tatsächlich schon sichtbar ist. Jeder Thread speichert seinen Wert zuerst in einem thread-spezifischen Cache. Die Änderungen in diesem Cache werden allerdings nicht direkt mit den anderen Caches synchronisiert, so dass in diesem Fall bei Thread-2 i immer noch 0 betragen kann.

Aber es gibt zumindest in Java eine schöne Lösung dafür: volatile! In dem wir die Variable als volatile int i = 0; deklarieren, ist sichergestellt, dass jegliche Änderung an dieser Variable sofort allen Threads sichtbar ist. Bei dem ein oder anderem wird sicherlich die Frage aufkommen: Und wenn die Lösung doch so schön ist, warum wird nicht standardmäßig jede Variable als volatile definiert? Ganz einfach, Performance! Ohne konkrete Zahlen zu nennen, kann man sich durchaus vorstellen, was die Aktualisierung mehrerer hundert Thread- Caches bewirkt, wenn jedes Mal eine Variable verändert wird...

Super, nun können wir mit mehreren nebenläufigen Threads eine Zahl iterieren! Was für eine Glanzleistung! Naja...leider funktioniert diese Lösung immer noch nicht so wirklich. Ich sagte doch, parallele Programmierung ist verdammt schwierig! Stichwort ist hierbei: Unterbrechenbare bzw. atomare Operationen. Wenn ich i + 1 berechne, führe ich im Prinzip zwei Operationen aus: Wert auslesen und neuen Wert setzen. Und hierbei kann ich unterbrochen werden. Kurz bevor Thread-1 den Wert auf 1 gesetzt hat, kann Thread-2 den Wert 0 ausgelesen haben, so dass - obwohl zwei Threads die Methode iterate aufgerufen haben - i am Ende den Wert 1 haben wird.

Als Lösung bietet Java mehrere Möglichkeiten an. Erste Alternative ist das Schlüsselwort synchronized mit dem wir unsere Methode ausstatten können: synchronized void iterate(). Grob bedeutet das, dass jeder Thread vor dem Aufruf der Methode warten muss, bis ein anderer Thread diese Methode verlassen hat. Problem: Wieder Performance! Sicherlich wird Performance bei diesem einfachen Beispiel nicht wirklich ausschlaggebend sein, aber wir sprechen hier auch von einem der einfachsten Beispiele überhaupt. In hochperformanten Applikationen, in denen mehrere tausend Threads von einem einzigen Thread blockiert werden, macht das sehr viel aus! Die zweite und bessere Alternative ist dagegen der Einsatz der AtomicInteger-Klasse, die auf eine atomare Compare-and-swap (CAS) Operation des Prozessors zugreift. Das korrekt funktionierende Beispiel sieht nun wie folgt aus:

class Besucher {
  AtomicInteger i = new AtomicInteger();
  void iterate() { i.incrementAndGet(); }
}

Weitere Informationen zum Thema Unterbrechenbarkeit und Sichtbarkeit liefert der sehr informative Beitrag von Angelika Langer.

Es ist schon erstaunlich worauf in Java alles geachtet werden muss, um eine thread-sichere Aufgabe umzusetzen. Und wir reden hier immer noch von der einfachen Iteration einer Zahl, komplexere Szenarien sind natürlich umso aufwendiger. Wäre es daher nicht schön, wenn ein einmal programmierter Code auch - ohne jegliche Änderungen - in einer nebenläufigen Umgebung laufen würde? Zukunftsmusik? Nicht wirklich, die Lösung lautet Akka und das Actor Model.

Actor Model - Akka

Wenn ihr zu den Entwicklern gehört, die denken, dass ein gemeinsam genutzter Speicherbereich die Wurzel allen Übels ist, dann seid ihr bei dem Actor Model und Akka genau richtig! Solche Probleme gibt es hier nicht! Naja, zumindest theoretisch...

Akka verwendet keine Threads, sondern Aktoren (Können technisch in etwa als leichtgewichtige Threads bezeichnet werden). Statt auf einen gemeinsamen Speicherbereich zuzugreifen, schicken sich Aktoren Nachrichten in asynchroner und event-basierter Form hin und her. Ohne lange herumzureden, schauen wir uns direkt ein Beispiel an, wie eine Akka basierende Kommunikation mit XBMC aussehen könnte. Dazu greifen wir ein Beispiel im dritten Teil dieser Serie auf und möchten wissen, an welcher Stelle sich das aktuell spielende Lied befindet. Zu Anschauungszwecken nehmen wir an, dass auf der XBMC-Seite ebenfalls Akka läuft und modellieren sowohl den Request, als auch die Response des XBMC-APIs als Scala Klassen.

object GetPosition {
  var jsonrpc = "2.0"
  var id = 1
  var method = "Player.GetProperties"
  var params: Map[String, Any] = Map("properties" -> List("type", "time", "percentage"), "playerid" -> 0)
}
case class Position(percentage: Double, time: Time)
case class Time(hours: Int, minutes: Int, seconds: Int) {
  val label = if (hours == 0) "%02d:%02d".format(minutes, seconds) else "%02d:%02d:%02d".format(hours, minutes, seconds)
}

Die dazugehörigen Aktoren sehen nun wie folgt aus:

import akka.actor.{ActorLogging, Actor, Props}
class MainActor extends Actor with ActorLogging {
  override def preStart() = {
    val xbmc = context.actorOf(Props[XBMCActor], "xbmc")
    xbmc ! GetPosition
  }
  def receive = {
    case Position(percentage, time) =>
      log.info(s"Percentage: $percentage, time: $time")
      context.stop(self)
  }
}
class XBMCActor extends Actor with ActorLogging {
  def receive = {
    case GetPosition =>
      log.info("Received request for current position")
      sender ! Position(0.5, Time(0, 2, 30))
  }
}

Wenn wir diese Klassen speichern, kompilieren und unser kleines Programm mittels des Befehls sbt "runMain akka.Main MainActor" ausführen, sollte in etwa folgende Ausgabe auf dem Bildschirm erscheinen:

[INFO] [02/29/2014 23:59:59.040] [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/xbmc] Received request for current position
[INFO] [02/29/2014 23:59:59.044] [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Percentage: 0.5, time: 02:30
[INFO] [02/29/2014 23:59:59.052] [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app-terminator] application supervisor has terminated, shutting down

In der Methode preStart des MainActors, die vom Akka-Framework automatisch beim (bzw. vor dem) Starten des Actors ausgeführt wird, wird zuerst ein neuer XBMCActor erzeugt (genauer gesagt, ist er der Kindsactor von MainActor) und mittels der ! Methode (! steht für tell, eine asynchrone fire-and-forget Methode) die Nachricht GetPosition an ihn versandt. Diese Nachricht empfängt der XBMCActor in der Methode receive. Dort antwortet er dem Versender mit der konkreten Position: sender ! Position(0.5, Time(0, 2, 30)). Der MainActor empfängt diese Antwort wiederum in seiner Implementierung der receive-Methode, gibt diese entsprechend aus und beendet sich schließlich selbst sowie das gesamte Szenario.

Manche werden sich womöglich jetzt die Frage stellen, was das nun mit nebenläufiger Programmierung zu tun haben könnte? Die korrekte Antwort müsste eigentlich lauten: Alles! Der Punkt ist, egal wie viele Threads mit diesen Aktoren arbeiten sollten, dieser Quellcode ist absolut thread-sicher und muss im Gegensatz zu unserer vorherigen Implementierung der Iteration einer Zahl, nachträglich nicht extra angepasst werden. Genial! Aus diesem Grund basiert die gesamte interne Kommunikation des XBMC Desktop Clients auf Akka.

Das ist nur ein ganz kleiner Teil von dem, was Akka alles beherrscht. Überwachungsstrategien (Supervision), in denen überlegt werden kann, was bei einem Absturz eines Actors geschehen soll, oder transparentes Remoting, bei dem Aktoren bspw. auf verschiedenen Servern liegen können, sind nur zwei erwähnenswerte Merkmale von Akka. Empfehlenswert ist auf jeden Fall die äußerst ausführliche Dokumentation, die sowohl für Scala als auch Java bereitsteht.

Quellcode

Der hier vorgestellte Beispielcode ist wiederum bei github zu finden: akka- simple-xbmc

blogroll
tags