Spark-like datasets in Unison

Additional topics

by
  • Rebecca Mark
  • Paul Chiusano

Here we'll cover some additional topics you may have wondered about while reading the article:

Error handling

While the Remote ability does support fine-grained error handling via functions like Remote.try and tryAwait, for batch computing workloads like the one we've developed here, coarse-grained error handling policies applied to an entire computation are often appropriate. Some example policies:

  • In the event of a failure, retry repeatedly (possibly with exponential backoff). If the failure keeps happening, a human will eventually intervene to kill the job.
  • In the event of a failure, retry up to 4 times, then reraise the failure.

These sorts of general policies can be provided by "middleware" handlers of Remote rather than needing to pollute nice implementations like Seq.reduce with retry logic.

Tree vs Seq and controlling granularity of parallelism

The lib.Tree we used in the tutorial is a bit different than the Seq used in the distributed library. This is subtle stuff. Let's take a look at the differences:

structural type Seq k a
structural type Seq k a
  = lib.dist_extra.Seq.Seq (Remote.Value (Two Mode a (Seq k a)))

What's going on here? Well, first, ignore that Seq has an extra k parameter. Also ignore for now the Mode parameter for controlling granularity of parallelism—we'll discuss that shortly.

More substantially, Seq is defined via mutual recursion between two types, Seq and Two. This is a little brain bending if you haven't seen this sort of thing before, but the usage here just enforces that there is a Remote.Value wrapping each level of the tree, including the root. Recall that Remote.Value is lazy, so this means we can't know if a Seq is empty, a leaf or a branch without first forcing the Remote.Value.

It's instructive to compare how the two types represent a few example trees:

use Value pure
emptyTree = lib.Tree.Empty
emptySeq = Seq (pure Two.Empty)
leafTree = Tree.One (pure "A")
leafSeq = Seq (pure (Two.One "A"))
branchTree = Tree.Two (pure leafTree) (pure leafTree)
branchSeq leafSeq mode = Seq (pure (Two.Two mode leafSeq leafSeq))

lib.Tree isn't fully lazy. Without looking inside any Remote.Value, we can tell whether the tree is empty, a leaf, or a brach. The laziness only kicks in when we try to see what is inside a leaf, or what subtrees are one level down from a Tree.Two.

In contrast, Seq is fully lazy: we can't obtain any information about its structure without calling either Value.map or Value.get. We can't even tell if the data structure is empty or not! We can tell that it is a Seq, but any information about its internal structure is guarded by a Remote.Value.

structural type Seq k a
structural type Seq k a
  = lib.dist_extra.Seq.Seq (Remote.Value (Two Mode a (Seq k a)))

For an ordinary data in-memory data structure, the difference in these two phrasings might not matter much in practice. For distributed data structures, it's a more substantive decision. Consider the function Seq.flatMap, and imagine calling it with a function that does some serious work:

Seq.flatMap : (a ->{Exception, Remote} Seq Defer b)
              -> Seq k a
              -> Seq Defer b
Seq.flatMap :
  (a ->{Exception, Remote} Seq Defer b) -> Seq k a -> Seq Defer b
Seq.flatMap f s =
  use Seq unwrap
  use Two Empty
  use Value pure
  go :
    Two Mode a (Seq k a) -> Remote.Value (Two Mode b (Seq Defer b))
  go = cases
    Empty -> pure Empty
    Two.One a -> unwrap (f a)
    Two.Two mode l r ->
      pure (Two.Two mode (Seq.flatMap f l) (Seq.flatMap f r))
  Seq (Value.flatMap go (unwrap s))

The above implementation is nicely lazy and does no work until the sequence is later forced (by a Seq.reduce say). In contrast, imagine writing flatMap for lib.Tree. If the tree happens to be a leaf it will have to strictly apply the function, which means that flatMap now requires access to Remote.

This is a bit of an awkward programming model, where some computation is run strictly some of the time, while other computations are evaluated lazily only when needed, and the difference depends on the size of the tree! Seq has a more uniform representation that ensures no work ever happens until the Seq is forced by a function like Seq.reduce, and this is often a good choice.

Controlling chunking and the granularity of parallelism

Let's look at the implementation of parallel.Tree.reduce:

parallel.Tree.reduce : a
                       -> (a ->{Remote} a ->{Remote} a)
                       -> lib.Tree a
                       ->{Remote} a
parallel.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> lib.Tree a ->{Remote} a
parallel.Tree.reduce zero combine = cases
  lib.Tree.Empty -> zero
  Tree.One valueA -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Remote fork
    use Value get map
    use parallel.Tree reduce
    leftTask = 
      fork here! do get (map (t -> reduce zero combine t) leftValue)
    rightTask = 
      fork here! do get (map (t -> reduce zero combine t) rightValue)
    left' = await leftTask
    right' = await rightTask
    combine left' right'

This does a parallel reduce of the tree, forking threads at each Tree.Two. This is fine if each subtree represents a large chunk of work. But for say a Tree Nat, close to the leaves of such a tree, there's so little data that the overhead of forking a thread to process may not pay for itself, even with Unison's lightweight threads.

The Seq type allows branches to be annotated with a Mode indicating whether they should be Parallel or Sequential. Functions like Seq.reduce respect the annotation and only fork threads if annotation indicates it's worthwhile, and functions like Seq.map leave the annotation alone:

Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce z op s =
  use Seq reduce
  use Two Two
  use distributed_6_0_0.Remote await fork
  go = cases
    Two.Empty          -> z
    Two.One a          -> a
    Two Sequential l r -> op (reduce z op l) (reduce z op r)
    Two Parallel l r   ->
      tl = fork here! do reduce z op l
      tr = fork here! do reduce z op r
      op (await tl) (await tr)
  Value.get (Value.map go (Seq.unwrap s))
Seq.map : (a ->{Exception, Remote} b) -> Seq k a -> Seq Defer b
Seq.map : (a ->{Exception, Remote} b) -> Seq k a -> Seq Defer b
Seq.map f s =
  use Two Empty One Two
  go = cases
    Empty        -> Empty
    One a        -> One (f a)
    Two mode l r -> Two mode (Seq.map f l) (Seq.map f r)
  Seq (Value.map go (Seq.unwrap s))

When building up a distributed sequence, you can control the granularity of parallelism by picking Parallel or Sequential for constructed Two.Two nodes, and functions like fromChunkedList will do this automatically. We'll talk more about constructing sequences in the next section.

In addition to using the Mode annotation, you can also work with trees whose leaf values are some chunk type. So rather than working with a Tree Nat, you could instead work with Tree [Nat] or use some more specialized chunk type. This sort of explicit chunking isn't as nice to program with but it can offer better performance.

Creating distributed sequences

It's nice that we can write functions like Seq.map that preserve the existing partitioning of the data onto multiple nodes. But how do we actually create a distributed sequence in the first place? This section gives the basic idea of how to construct distributed sequences in terms of the Location type used within Remote.

Distributed sequences will typically be created lazily, by repeatedly splitting some state in half and producing a leaf or the empty once the state hits some base case. For instance, here's a function that produces a Seq from a list, placing every chunkSize subtree together at a single location:

fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt region chunkSize as =
  use List halve
  use Nat * <= >=
  use Two Two
  step isRoot count as =
    if List.size as <= chunkSize then
      distributed_6_0_0.Value.at
        region (Seq.unwrap (Seq.fromList Sequential as))
        |> Value.join
    else
      if count >= chunkSize
        || isRoot then
        distributed_6_0_0.Value.delayAt region do
          (l, r) = halve as
          Two Parallel (Seq (step false 2 l)) (Seq (step false 2 r))
      else
        (l, r) = halve as
        count' = count * 2
        Value.pure
          (Two
            Parallel
            (Seq (step false count' l))
            (Seq (step false count' r)))
  Seq (step true 1 as)

As a sample use case, we might call this with a list of urls or file names from S3. A subsequent Seq.map or Seq.flatMap can then be used to fetch (or "hydrate") the contents of those urls. The loading happens lazily, only when the resulting Seq is forced.

If the sequence is so large that not even the names of all the files can fit in memory, we might recursively build the sequence using fromListAt (to list top level directory names) and Seq.flatMap (to recursively build a sequence for the files under each directory).

We can also use more general combinators like Seq.unfold or skewUnfoldAt.

If you're interested in using Unison at work for this sort of thing, we'd love to chat with you.