Here we'll cover some additional topics you may have wondered about while reading the article:
- What about error handling?
Seq
vsTree
and controlling the granularity of parallelism- How do I create a distributed sequence?
Error handling
While theRemote
ability does support fine-grained error handling via functions liketry
andtryAwait
,for batch computing workloads like the one we've developed here, coarse-grained error handling policies applied to an entire computation are often appropriate. Some example policies:
- In the event of a failure, retry repeatedly (possibly with exponential backoff). If the failure keeps happening, a human will eventually intervene to kill the job.
- In the event of a failure, retry up to 4 times, then reraise the failure.
These sorts of general policies can be provided by "middleware" handlers ofRemote
rather than needing to pollute nice implementations likeSeq.reduce
with retry logic.
Tree
vsSeq
and controlling granularity of parallelism
TheTree
we used in the tutorial is a bit different than theSeq
used in thedistributed library.This is subtle stuff. Let's take a look at the differences:
structural type Tree a
structural type Tree a
= lib.Tree.One (distributed.Value a)
| lib.Tree.Two
(distributed.Value (Tree a)) (distributed.Value (Tree a))
| lib.Tree.Empty
structural type Seq k a
structural type Seq k a
= lib.distributed.Seq.Seq
(distributed.Value (Two Mode a (Seq k a)))
structural type Two m a r
structural type Two m a r
= lib.distributed.structures.Two.Empty
| lib.distributed.structures.Two.Two m r r
| lib.distributed.structures.Two.One a
What's going on here? Well, first, ignore thatSeq
has an extrak
parameter. Also ignore for now theMode
parameter for controlling granularity of parallelism—we'll discuss that shortly.
More substantially,Seq
is defined via mutual recursion between two types,Seq
andTwo
.This is a little brain bending if you haven't seen this sort of thing before, but the usage here just enforces that there is adistributed.Value
wrapping each level of the tree, including the root. Recall thatdistributed.Value
is lazy, so this means we can't know if aSeq
is empty, a leaf or a branch without first forcing thedistributed.Value
.
It's instructive to compare how the two types represent a few example trees:
use Value pure
emptyTree = Tree.Empty
emptySeq = Seq (pure Two.Empty)
leafTree = Tree.One (pure "A")
leafSeq = Seq (pure (Two.One "A"))
branchTree = Tree.Two (pure leafTree) (pure leafTree)
branchSeq leafSeq mode = Seq (pure (Two.Two mode leafSeq leafSeq))
Tree
isn't fully lazy. Without looking inside anydistributed.Value
,we can tell whether the tree is empty, a leaf, or a brach. The laziness only kicks in when we try to see what isinsidea leaf, or what subtrees are one level down from aTree.Two
.
In contrast,Seq
isfully lazy:we can't obtainanyinformation about its structure without calling eitherValue.map
orValue.get
.We can't even tell if the data structure is empty or not! We can tell that it is aSeq
,but any information about its internal structure is guarded by adistributed.Value
.
structural type Tree a
structural type Tree a
= lib.Tree.One (distributed.Value a)
| lib.Tree.Two
(distributed.Value (Tree a)) (distributed.Value (Tree a))
| lib.Tree.Empty
structural type Seq k a
structural type Seq k a
= lib.distributed.Seq.Seq
(distributed.Value (Two Mode a (Seq k a)))
structural type Two m a r
structural type Two m a r
= lib.distributed.structures.Two.Empty
| lib.distributed.structures.Two.Two m r r
| lib.distributed.structures.Two.One a
For an ordinary data in-memory data structure, the difference in these two phrasings might not matter much in practice. For distributed data structures, it's a more substantive decision. Consider the functionSeq.flatMap
,and imagine calling it with a function that does some serious work:
Seq.flatMap :
(a ->{Exception, Remote} Seq Defer b) -> Seq k a -> Seq Defer b
Seq.flatMap f s =
use Seq unwrap
use Two Empty
use Value pure
go :
Two Mode a (Seq k a)
-> distributed.Value (Two Mode b (Seq Defer b))
go = cases
Empty -> pure Empty
Two.One a -> unwrap (f a)
Two.Two mode l r ->
pure (Two.Two mode (Seq.flatMap f l) (Seq.flatMap f r))
Seq (Value.flatMap go (unwrap s))
The above implementation is nicely lazy and does no work until the sequence is later forced (by aSeq.reduce
say). In contrast, imagine writingflatMap
forTree
.If the tree happens to be a leaf it will have tostrictlyapply the function, which means thatflatMap
now requires access toRemote
.
This is a bit of an awkward programming model, where some computation is run strictly some of the time, while other computations are evaluated lazily only when needed, and the difference depends on the size of the tree!Seq
has a more uniform representation that ensures no work ever happens until theSeq
is forced by a function likeSeq.reduce
,and this is often a good choice.
Controlling chunking and the granularity of parallelism
Let's look at the implementation ofparallel.Tree.reduce
:
parallel.Tree.reduce : a
-> (a ->{Remote} a ->{Remote} a)
-> Tree a
->{Remote} a
parallel.Tree.reduce :
a -> (a ->{Remote} a ->{Remote} a) -> Tree a ->{Remote} a
parallel.Tree.reduce zero combine = cases
Tree.Empty -> zero
Tree.One valueA -> Value.get valueA
Tree.Two leftValue rightValue ->
use Remote fork
use Value get map
use parallel.Tree reduce
leftTask =
fork here! '(get (map (t -> reduce zero combine t) leftValue))
rightTask =
fork here! '(get (map (t -> reduce zero combine t) rightValue))
left' = await leftTask
right' = await rightTask
combine left' right'
This does a parallel reduce of the tree, forking threads at eachTree.Two
.This is fine if each subtree represents a large chunk of work. But for say aTree Nat
,close to the leaves of such a tree, there's so little data that the overhead of forking a thread to process may not pay for itself, even with Unison's lightweight threads.
TheSeq
type allows branches to be annotated with aMode
indicating whether they should beParallel
orSequential
.Functions likeSeq.reduce
respect the annotation and only fork threads if annotation indicates it's worthwhile, and functions likeSeq.map
leave the annotation alone:
Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce z op s =
use Remote fork
use Seq reduce
use Two Two
go = cases
Two.Empty -> z
Two.One a -> a
Two Sequential l r -> op (reduce z op l) (reduce z op r)
Two Parallel l r ->
tl = fork here! '(reduce z op l)
tr = fork here! '(reduce z op r)
op (await tl) (await tr)
Value.get (Value.map go (Seq.unwrap s))
When building up a distributed sequence, you can control the granularity of parallelism by pickingParallel
orSequential
for constructedTwo.Two
nodes, and functions likefromChunkedList
will do this automatically. We'll talk more about constructing sequences in the next section.
In addition to using theMode
annotation, you can also work with trees whose leaf values are some chunk type. So rather than working with aTree Nat
,you could instead work withTree [Nat]
or use some more specialized chunk type. This sort of explicit chunking isn't as nice to program with but it can offer better performance.
Creating distributed sequences
It's nice that we can write functions likeSeq.map
that preserve the existing partitioning of the data onto multiple nodes. But how do we actually create a distributed sequence in the first place? This section gives the basic idea of how to construct distributed sequences in terms of theLocation
type used withinRemote
.
Distributed sequences will typically be created lazily, by repeatedly splitting some state in half and producing a leaf or the empty once the state hits some base case. For instance, here's a function that produces aSeq
from a list, placing everychunkSize
subtree together at a single location:
fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt region chunkSize as =
use List halve
use Nat * <= >=
use Two Two
step isRoot count as =
if List.size as <= chunkSize then
Value.at region (Seq.unwrap (Seq.fromList Sequential as))
|> Value.join
else
if (count >= chunkSize) || isRoot then
delayAt region do
(l, r) = halve as
Two Parallel (Seq (step false 2 l)) (Seq (step false 2 r))
else
(l, r) = halve as
count' = count * 2
Value.pure
(Two
Parallel
(Seq (step false count' l))
(Seq (step false count' r)))
Seq (step true 1 as)
As a sample use case, we might call this with a list of urls or file names from S3. A subsequentSeq.map
orSeq.flatMap
can then be used to fetch (or "hydrate") the contents of those urls. The loading happens lazily, only when the resultingSeq
is forced.
If the sequence is so large that not even the names of all the files can fit in memory, we might recursively build the sequence usingfromListAt
(to list top level directory names) andSeq.flatMap
(to recursively build a sequence for the files under each directory).
We can also use more general combinators likeSeq.unfold
orskewUnfoldAt
.