hasbecome

Building third party software packages concurrently using Scala and Akka

Recently at work I started tackling the builds of all the third party packages that we depend on. As you can imagine, once you have a certain number of these packages you end up with a bit of a web of dependency and the time to start and finish a correct build grows rather large. This tax on developer time lead to an unfortunate choice to just start symlinking previous builds instead of actually rebuilding packages when we move to a new OS (a particularly troublesome thing to do on OSX). In addition to this, as packages are added or upgraded, the configuration might not reflect the current state correctly.

So I set out to get the chores done and make this whole process easier to manage so that doing everything correctly became the simple thing to do.

Oh yeah… I also saw a perfect opportunity to write an application with Scala and Akka! :)

Determining What to build

There is an existing build_all.bash script which takes a simple list of package names and expects them to be in the correct order for any dependencies. Running this means the entire build process is run serially. Not only that, but if a package fails to build no other package after it will get built because the script just exits.

Rather than approach building a list of packages by manually specifying a list of packages I wanted to get a little clever. The convention is that every package is configured by a file vars.inc so I figured that finding all vars.inc files below a certain directory would be the most reasonable way. The build root is passed to the program as an argument we build the list of files like so:

def varFiles(build_root: String): Stream[String] = {
  Process(Seq("find", build_root, "-name", "vars.inc", "-type", "f")).lines
} 

Dependency Analysis

The existing system had some notion of dependencies. But it wasn’t something that actually affected build order at all. In cases where a dependency was specified it didn’t do much beyond changing the name of a target directory. This is important for packages which get built multiple times, but for other packages it actually creates the problem where people stopped listing dependencies explicitly because the package name was getting far too long to refer to in any reasonable way. ffmpeg for example depends on a lot of codec libraries.

So we have implicit and explicit dependencies for potentially all packages. In addition to that all packages have an unexpressed dependency on GCC 4.1.5. That is, all packages except GCC 4.1.5. :)

Explicit Dependencies

We read the explicit dependencies from the contents of the vars.inc files. Each entry in the file looks something like the following:

FOO_MAIN=("foo" "1.0.0")
FOO_ALT=("foo" "1.0.0-alt" BAR_MAIN BIZ_MAIN BAZ_MAIN) 

In this example the package labeled FOO_ALT has explicit dependencies on the packages labeled BAR_MAIN, BIZ_MAIN, BAZ_MAIN.

This configuration format is extremely easy to match with a regex:

val VarEntry = "^(.+)=\\(\"(.+)\" \"(.+)\"(.*)\\)".r 

Implicit Dependencies

By convention all packages are built by a file called build.bash. This file relies on a suite of functions provided by a top-level script called base_build.bash. One of the functions provided is named b3pGetLocalPath. This is a good indicator of an implicit dependency because it takes a package label and returns the path to the root of where this package lives.

This was another easy job for a regex:

val ImplicitDep = "^.*\\$\\(b3pGetLocalPath (.+)\\)$".r 

Reading everything, and building up a registry of packages

Now that I had a plan for determining the relationships between packages it was time to read it all up and build a plan of attack. I decided that I would construct a build queue where each item in the queue was a list of packages that could be built in parallel. This isn’t actually the best way to go about it and I’ll talk about that later. For now though this is how things are working.

I started by creating a case class to represent a specific unit to be built:

case class BuildSpec(ref:String,          // A ref is the label given to a build
                     name:String,         // The actual package name
                     version:String,      // The version of the package to build
                     script:String,       // An absolute path to the build.bash file
                     deps:Vector[String]) // A list of "refs" that this package depends on 

I also created a couple of type aliases:

type BuildPhase = Vector[BuildSpec]
type BuildQueue = Queue[BuildPhase]
type PackageRegistry = Map[String, BuildSpec] 

The first goal is to build a Map[String, BuildSpec] of all the packages under the build root. To do this I take the Stream[String] returned by the varFiles method and for each line that matches my VarEntry regex I extract the relevant information and update the map.

Let’s look at the function at the top of this process:

def readFiles(var_files: Stream[String]): PackageRegistry = {
  @tailrec
  def readFile(files: Stream[String], result: PackageRegistry): PackageRegistry = {
    if (files.isEmpty) result else readFile(files.tail, result ++ varData(files.head))
  }
  readFile(var_files, Map.empty)
} 

Pretty simple. So let’s look now at the varData method which does the heavy lifting:

def varData(filename: String): PackageRegistry = {
  val file = new java.io.File(filename)
  val package_dir = file.getParent
  @tailrec
  def extract(lines: List[String], result: PackageRegistry): PackageRegistry = {
    if (lines.isEmpty) {
      result
    } else {
      lines.head match {
        case VarEntry(ref, name, version, deps) => {
          val build_script = Seq(package_dir, version, "build.bash").mkString("/")

          val explicit_deps = deps.split(" ").filter(_ != "").toVector
          val implicit_deps = extractImplicitDeps(build_script).filter(!explicit_deps.contains(_))

          val applied_deps = if (ref == "GCC_MAIN") {
            explicit_deps ++ implicit_deps
          } else {
            (explicit_deps ++ implicit_deps) ++ Vector("GCC_MAIN")
          }
          val spec = BuildSpec(ref, name, version, build_script, applied_deps)

          extract(lines.tail, result + (ref -> spec))
        }
        case _ => extract(lines.tail, result)
      }
    }
  }
  extract(io.Source.fromFile(filename).getLines().toList, Map.empty)
}

This method isn’t really any more complicated than readFiles but it’s doing a bit more. First thing it does is get the path of the directory the file is in. This is used by the inner method to build the path to the build.bash script that we’ll actually run when it’s time to build this package.

The inner method uses the VarEntry regex to extract the ref, name, version, and raw deps for each entry. When a match occurs, we use this information to construct the build_script path, get the list of explicit dependencies, then we call extractImplicitDeps and filter out any potential duplication. Next we add GCC_MAIN to our dependencies unless this package happens to be GCC_MAIN itself. Finally we create our BuildSpec instance and update our result with a (ref -> spec).

Let’s look at the extractImplicitDeps method:

def extractImplicitDeps(build_script:String): Vector[String] = {
  @tailrec
  def nextDep(lines: Vector[String], result: Vector[String]): Vector[String] = {
    if (lines.isEmpty) {
      result
    } else {
      lines.head match {
        case ImplicitDep(ref) => nextDep(lines.tail, result :+ ref)
        case _ => nextDep(lines.tail, result)
      }
    }
  }
  nextDep(io.Source.fromFile(build_script).getLines().toVector, Vector.empty[String])
} 

Pretty straight forward. Overall none of this is especially magical, but it is pretty fantastic what tools you have with Scala. It’s proabaly also plain to see how new I am to Scala so please don’t hestiate to recommend any changes to these methods. Just writing this post I’ve started to see things I’d like to change here.

Assembling a build queue from the package registry

To start with let me take a second to explain what isn't great about my solution. In my mind I thought that simply breaking the build into phases would be granular enough. This isn’t the case because what happens is that if you happen to have a couple of very long builds in a phase (Qt for instance) you’ll end up with idle threads while that finishes up. I should have followed the hunch I had early on once I started designing the actor system that runs the build. Because of the approach I took, the actor system doesn’t feel quite right and you’ll notice this too once I get to that part. :) Again, please don’t hesitate to make any recommendations.

Ok, back to the program!

I decided that I wanted to take the package registry and return a BuildQueue that I could hand off to an actor system. I did this using the following method:

def assembleBuildQueue(specs:PackageRegistry): BuildQueue = {
  @tailrec
  def nextPhase(specs: PackageRegistry, result: BuildQueue): BuildQueue = {
    if (specs.isEmpty) {
      result
    } else {
      val phase = specs.values.filter(_.deps.count(specs.contains) == 0).toVector
      nextPhase( specs.filter((s) => ! phase.contains(s._2)), result ++ Queue(phase) )
    }
  }
  val result = Queue(specs.values.filter(_.deps.length == 0).toVector)
  nextPhase(specs.filter(_._2.deps.length > 0), result)
}

What’s happening here is that I determined a phase to mean: “A list of packages where any of their dependencies has already been enqueued”. So phase0 would be any package with no dependencies at all which is what we pass to our inner method as the initial value of our result parameter, for each iteration we just filter the remaining specs based on this rule.

This will be redesigned to address the deficiencies mentioned above. My thought for the moment is to shift away from an all encompassing phase for anything that can be built concurrently into a queue of sequences that can built concurrently so that there isn’t an implicit coupling of unrelated packages. I’ll make a follow-up post once this has been developed.

Building an actor system to execute a build queue

It should come as no surprise that I chose to use the work pulling pattern to execute a build. There are several great examples on the Akka Team Blog to learn from it felt like an easy choice.

My actor system design consisted of three actors: BuildQueueManager, BuildPhaseManager, and SpecBuilder.

The following messages are used by the system:

case class WorkOn[T](data: T)
case class Busy[T](data: T)
case object WorkAvailable
case object WorkRequest
case class RegisterWorker(worker:ActorRef)
case object ShutdownRequest
case object ShutdownComplete
trait BuildResult
case class BuildFailure(spec: BuildSpec) extends BuildResult
case class BuildSuccess(spec: BuildSpec) extends BuildResult 

The BuildQueueManager

Let’s start at the top and look at the BuildQueueManager. It has the following members:

val phase_manager = context.actorOf(Props[BuildPhaseManager], "phase-manager")
val blacklisted = Set.empty[String]
var current_data: Option[BuildQueue] = None 

In the preStart method it creates a set of SpecBuilder instances and registers them with the BuildPhaseManager:

val worker_count: Int = sys.runtime.availableProcessors() / 2
for (i <- 0 to worker_count) {
  val worker = context.actorOf(Props[SpecBuilder], s"spec-builder-$i")
  phase_manager ! RegisterWorker(worker)
} 

A build is started by sending a message of WorkOn(queue) where queue is of type BuildQueue. The queue manager updates it’s current_data reference and informs the phase manager that work is now available:

case WorkOn(work: BuildQueue) => {
  log.info("Starting build.\n")
  current_data = Some(work)
  phase_manager ! WorkAvailable
} 

The queue manager responds to a WorkRequest message by sending an item from the queue as a WorkOn message. If the queue is empty it will send a ShutdownRequest to the phase manager instead.

The data sent with the WorkOn message is a BuildPhase instance obtained from the head of the queue. Before it is sent, any items that have a dependent ref that’s been added to the blacklist are filtered out and added to the blacklist themselves. Finally, once the message is sent, current_data is set to the tail of the queue.

case WorkRequest => current_data match {
  case None => None
  case Some(data) => {
    if (!data.isEmpty) {
      val next_work_item = data.head.filter { spec =>
        val result = spec.deps.toSet.intersect(blacklisted).isEmpty
        if (!result) {
          log.warning(s"${spec.ref} will not be built because a dependency has been blacklisted.")
          // We add this to the blacklist as well
          blacklisted.add(spec.ref)
        }
        result
      }
      // OH HAI, a bug! If this list is empty the application will never exit :/
      sender ! WorkOn(next_work_item)
      current_data = Some(data.tail)
    } else {
      current_data = None
      log.info("Build completed.")
      if (!blacklisted.isEmpty) {
        log.warning(s"The following ${blacklisted.size} packages failed to build.")
        log.warning(s"${blacklisted.mkString(", ")}")
      }
      phase_manager ! ShutdownRequest
    }
  }
} 

If a build fails for any reason, a BuildFailure message is propagated from a SpecBuilder to the BuildQueueManager. The associated ref is then added to the blacklist:

case BuildFailure(spec) => {
  log.error(s"${spec.ref} (${spec.name} ${spec.version}) failed and will be added to the blacklist.")
  log.warning(s"Downstream packages that depend on ${spec.ref} will not be built.")
  blacklisted.add(spec.ref)
} 

The BuildPhaseManager

This is the weakest link in the current design. It’s more than likely the case that it’s a redundant actor and with a more appropriate queue design there would only be the need for a single manager.

The BuildPhaseManager has the following members:

val master = context.actorSelection("akka://b3p/user/master")
val workers = Set.empty[ActorRef]
var current_data: Option[BuildPhase] = None
var active_builds:Int = 0 

We use the active_builds member to sort of reference count the current phase.

When BuildPhaseManager receives a WorkOn message it behaves almost identically to the BuildQueueManager.

case WorkOn(work: BuildPhase) => {
  current_data match {
    case None => {
      log.info(s"Starting build phase with ${work.length} packages.")
      current_data = Some(work)
      workers.foreach(_ ! WorkAvailable)
    }
    case Some(_) => sender ! Busy(work)
  }
} 

Workers are registerd by sending an ActorRef with the RegisterActor message.

For each WorkRequest message we receive, if there is any work available we send the head of our current phase to the sender. Then acitve_builds is incremented and current_data assigned the tail of the current phase. If the current phase was empty we just assign it to None.

case WorkRequest => current_data match {
  case None => None
  case Some(data) => {
    if (!data.isEmpty) {
      sender ! WorkOn(data.head)
      active_builds += 1
      current_data = Some(data.tail)
    } else {
      current_data = None
    }
  }
} 

The phase manager waits for either a BuildFailure or BuildSuccess message at which point it calls the buildCompleted method.

def buildCompleted(spec: BuildSpec, success: Boolean) {
  log.info(s"Build ${if (success) "succeeded" else "failed"} for ${spec.ref}.")
  active_builds -= 1
  if (active_builds == 0) {
    log.info("Phase completed.\n")
    master ! WorkRequest
  }
} 

Failed builds are propagated upstream to the BuildQueueManager.

The SpecBuilder

This is the simplest actor of the system. It receives a BuildSpec, executes the associated script, and sends a BuildSuccess or BuildFailure back to the sender.

It has the following members:

val output = ArrayBuffer.empty[String]
val proc_logger = ProcessLogger(line => output.append(line), line => output.append(line)) 

Here is the receive method:

def receive = {
  case WorkAvailable => sender ! WorkRequest
  case WorkOn(spec:BuildSpec) => {
    // Reset any output that we collected from the previous run
    output.clear()
    sender ! runBuild(spec)
    sender ! WorkRequest
  }
} 

And here is the runBuild method:

def runBuild(spec:BuildSpec): BuildResult = {
  val script = new java.io.File(spec.script)
  val code = Process(Seq("bash", spec.script), script.getParentFile).!(proc_logger)
  if (code == SpecBuilder.EXIT_OK || code == SpecBuilder.EXIT_NOT_IMPLEMENTED) {
    BuildSuccess(spec)
  } else {
    BuildFailure(spec)
  }
}

The SpecBuilder implementation is clearly not finished yet. First item on the agenda is to save any output from failed builds so that we can actually see what went wrong.

Summary

I would consider this a successful first pass at a work pet-project. One of the goals was to develop a more concrete understanding of Akka and how to make the most of it. This is of course also great practice for my Scala-fu as well.

Any and all critiques are welcome. In fact, I’m honestly hoping to hear some.

How to support Range requests with Spray

I recently developed a simple service using Scala and Spray which needed to be able to serve videos to iOS devices. In order to do this you need to support byte range requests which isn’t immediately clear.

Spray does not currently have a formal range request routing directive but it does have everything you need to support these requests.

Here is my simple implementation. I’ve tested it on a couple of iPads running iOS 6 and 7.

The ByteRange used in the match expression is just a little regex:

val ByteRange = "^bytes=(.*)-(.*)$".r

Delaunay Triangulation in Common Lisp

tl;dr: Something that’s been on my TOCODE list for a while was to implement the triangulation of a set of random points; particularly in Common Lisp.

You can go straight to the code here.


The topic of proceduralism in art, vfx, and games has always interested me and is very likely the primary reason I’m compelled to write programs in Lisp. Clojure served as my primary introduction to the world of Lisp but I’ve ultimately come to really enjoy and appreciate Common Lisp. There isn’t much I can say about the general Common Lisp community, but the friendly confines of #lispgames on Freenode have been a tremendous inspiration and motivator. The best part is that #lispgames is home for folks using all the branches of the Lisp family tree (and even some Smalltalk).

Upon setting out to research Delaunay Triangulation I looked for reference, inspiration and code examples from anywhere and everywhere. Here is a handful of links that were useful or inspirational:

These got me almost entirely there but something wasn’t quite clicking. The following image illustrates the problem:

As you can see I was just generating a soup of edges. I tried all kinds of things and truth be told that was an excellent exercise because it lead me to re-implement functions and that iteration was a positive learning experience. I feel like the effort produced code that is a little more clear as a result.

It turned out that I was simply mis-reading a core detail of the algorithm and it wasn’t until I read the original pseudocode by the inventor of the algorithm (available here) that I realized what the problem was: I was removing “duplicate” edges instead of edges that were “doubly specified”.

The difference there is subtle; what I originally did was turn a list such as (a a b c) into (a b c) but what I should have ended up with was (b c).

Once I realized this everything fell into place.

Akka, AMQP, Play, and Scala (a follow up)

So this weekend I was following along with a very helpful blog post when I realized that the way it’s implemented prevents proper application shutdown since the primary actor for consuming messages will block eternally.

Since I’m new to this I searched around for a bit more context or possible solutions but it really came down to actually reading the RabbitMQ api docs.

The solution in the post uses a deprecated class QueueingConsumer which provides the blocking api. But you don’t need to do this at all these days. Just implementing a simple subclass of the DefaultConsumer is sufficient for streaming messages into your actor system without having to manage your own threads or worry about proper shutdown.

Here is my MsgConsumer implementation:

And in your Global settings you need to have a setup similar to this:

I deviated a bit from the original post, but hopefully this is enough information to explain the solution I reached.