Spark-like datasets in Unison

Distributed and parallel reductions

by
  • Rebecca Mark
  • Paul Chiusano

In the previous part of this series, we introduced a simple distributed tree type and showed how to implement transformation functions like lib.Tree.map in a way that "brings the computation to the data". We saw how functions like lib.Tree.map are lazy: they don't do any work when called but merely set up a pending transformation that will be applied as the data structure is forced.

In this part of the series we'll focus on functions for our lib.Tree data type that evalute or force the data structure in some way. We'll use a reduce function as an example. Again, this will illuminate how small tweaks to the code can cause different runtime behavior.

sequential.Tree.reduce zero combine tree has three parameters: a "zero" value to return if a subtree is empty, combine for combining the results of two subtrees, and the tree to be reduced. Reduce functions for in-memory data structures deal with a set of familiar concerns: do you want to reduce the left or right subtree first, should the reduce implementation be tail-recursive or maintain state on the stack, but because we're now working with distributed computations, our reduce implmentation needs to manage the additional dimensions of where and when the combine function for reduce should be run.

We could write a reduce function that behaved in either one of the following ways:

  • Send the function down: Push the combining function down the tree to the data, and send the resulting Nat reduced value back to the parent for combining.
  • Send the subtrees up: Send each forced Tree Nat up to its parent Tree.Two, which calls reduce on each subtrees, then combines the two Nat results.

We ultimately want the Send the function down option, since sending a Nat to the parent will be cheaper than sending a Tree Nat (only to immediately reduce that to a Nat), but we'll illustrate both here. Take a look at the recursive case in processing the Tree.Two branch in the following implementation:

sequential.Tree.reduce : a
                         -> (a ->{Remote} a ->{Remote} a)
                         -> lib.Tree a
                         ->{Remote} a
sequential.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> lib.Tree a ->{Remote} a
sequential.Tree.reduce zero combine = cases
  lib.Tree.Empty -> zero
  Tree.One valueA -> Value.get valueA
  Tree.Two leftValue rightValue ->
    combine
      (sequential.Tree.reduce zero combine (Value.get leftValue))
      (sequential.Tree.reduce zero combine (Value.get rightValue))

It does the following:

  1. Evaluate the left subtree and send it to the current location.
  2. Evaluate the right subtree and send it to the current location.
  3. Recursively reduce the left subtree.
  4. Recursively reduce the right subtree.
  5. Apply the combine function to the two results from (3) and (4).

We've implemented the send the subtrees up approach. If the subtrees are at the same location as the parent, this is fine. But since this is meant to be used in situations where the data cannot fit on one location, there will be nodes in the tree where the parent resides at a different location than one of its subtrees. In these places we're sending more data over the network than we should.

There's another problem with the send the subtrees up approach: the reducing and combining is always happening where the parent resides. Since this is a recursive function, this means that all the work is ultimately happening at whatever location calls sequential.Tree.reduce. That is going to be bad when we try to add parallelism later—we can't have one location doing all the work!

Let's try to write a version of reduce that implements the send the function down approach, using Value.map instead:

withMap.Tree.reduce : a
                      -> (a ->{Remote} a ->{Remote} a)
                      -> lib.Tree a
                      ->{Remote} a
withMap.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> lib.Tree a ->{Remote} a
withMap.Tree.reduce zero combine = cases
  lib.Tree.Empty                -> zero
  Tree.One valueA               -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Value get map
    use withMap.Tree reduce
    left' = map (t -> reduce zero combine t) leftValue
    right' = map (t -> reduce zero combine t) rightValue
    combine (get left') (get right')

This version will:

  1. reduce the left subtree at its location.
  2. reduce the right subtree at its location.
  3. Send the reduced left value to the parent.
  4. Send the reduced right value to the parent.
  5. Combine the two reduced values at the parent.

This is the send the function down approach. Notice at at no point are we sending a lib.Tree to the parent (there are no calls to Value.get that return a lib.Tree, only calls to Value.get that return reduced values).

While this is an improvement in our execution strategy for sequential.Tree.reduce, we are still reducing the left and the right subtrees sequentially, first the left, then the right. Why not reduce the two subtrees in parallel?

To make this into a parallel reduce, we can use Remote.fork to start a computation running in a background Task. An await blocks until the forked Task completes and returns its result (or raises a failure if the forked task failed).

Using Remote.fork and await in our reduce function yields something like this:

parallel.Tree.reduce : a
                       -> (a ->{Remote} a ->{Remote} a)
                       -> lib.Tree a
                       ->{Remote} a
parallel.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> lib.Tree a ->{Remote} a
parallel.Tree.reduce zero combine = cases
  lib.Tree.Empty -> zero
  Tree.One valueA -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Remote fork
    use Value get map
    use parallel.Tree reduce
    leftTask = 
      fork here! do get (map (t -> reduce zero combine t) leftValue)
    rightTask = 
      fork here! do get (map (t -> reduce zero combine t) rightValue)
    left' = await leftTask
    right' = await rightTask
    combine left' right'

Our left and right lib.Tree branches are now being reduced in parallel through the use of Remote.fork and await. Moreover, all the pending transformations that have been applied to the tree (for instance via lib.Tree.map) will be forced in parallel in the same pass as the parallel.Tree.reduce.

🤯

Wowser! So our distributed map and parallel reduce functions are just a few lines of code each. It's remarkable that we can obtain exactly the runtime behavior we want just by phrasing our functions in the right way.

This is what writing distributed programs is like in Unison. You have to consider where and when you want computations to happen, and express those decisions with code, but it's a tiny amount of code that deals with the essence of the important decisions, not serialization or networking boilerplate, or deployment scripts or YAML files or building containers...

Operations that only partially evaluate a structure

Suppose we want to write a function that doesn't require us to fully evaluate the tree? Let's say we want a function lib.Tree.take that lists the first n elements it finds in the tree. Let's write a function that may not need to force the right subtree at all:

lib.Tree.take : Nat -> lib.Tree a ->{Remote} [a]
lib.Tree.take n = cases
  lib.Tree.Empty            -> []
  Tree.One valueA           -> List.singleton (Value.get valueA)
  Tree.Two leftVal rightVal ->
    use List ++ size
    use Nat - >=
    use Value get map
    use lib Tree.take
    combine l r =
      if size l >= n then List.take n l
      else
        nextN = n - size l
        l ++ get (map (Tree.take nextN) r)
    combine (get (map (Tree.take n) leftVal)) rightVal

The trick is we have to guard the right branch from being evaluated by keeping it wrapped in a Remote.Value. So the function that joins together the left and right branches has to be more careful about the circumstances in which it evaluates the right branch via calls to Value.get.

An exercise for the reader 😎
An exercise for the reader 😎
📓

Exercise: Implement lazy reduce

Try to generalize the lazy lib.Tree.take function into a lazy reduce function:

lazy.Tree.reduce : a -> (a -> Remote.Value a ->{Remote} a) -> lib.Tree a ->{Remote} a
Show me the answer
Show me the answer
lazy.Tree.reduce : a
                   -> (a -> Remote.Value a ->{Remote} a)
                   -> lib.Tree a
                   ->{Remote} a
lazy.Tree.reduce :
  a -> (a -> Remote.Value a ->{Remote} a) -> lib.Tree a ->{Remote} a
lazy.Tree.reduce zero combine = cases
  lib.Tree.Empty                -> zero
  Tree.One valueA               -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Value map
    use lazy.Tree reduce
    left' = map (t -> reduce zero combine t) leftValue
    right' = map (t -> reduce zero combine t) rightValue
    combine (Value.get left') right'

Takeaways

  • Value.get will force the evaluation of a Remote value, bringing it to the location of the caller. You'll see it in functions which interpret the distributed data structure.
  • Use Value.map and Value.get in tandem to control where and when a computation should be run.
  • Unison's Remote.fork and await functions provide a way to introduce parallelization to remote computations.

Whatever runtime behavior you want for your distributed computations can be achieved with only tiny code changes, and the decisions you make are then codified in reusable functions that others can use without needing to be experts in distributed systems.

In the next part, we'll go further, showing how computations on distributed data structures can be made incremental, avoiding work that has already been done in a previous execution.

If you're interested in using Unison at work for this sort of thing, we'd love to chat with you.